You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/29 05:43:34 UTC

[1/7] git commit: DRILL-178 Creating JSON Storage engine

Updated Branches:
  refs/heads/master e43093d9e -> 4515263d3


DRILL-178 Creating JSON Storage engine


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/dddae743
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/dddae743
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/dddae743

Branch: refs/heads/master
Commit: dddae743c78a9b193fc4bf4d350f6e25f4e9484c
Parents: e43093d
Author: Timothy Chen <tn...@gmail.com>
Authored: Tue Aug 20 23:53:41 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Aug 28 20:32:45 2013 -0700

----------------------------------------------------------------------
 .../drill/exec/store/json/JSONGroupScan.java    | 206 ++++++++++---------
 .../drill/exec/store/json/JSONRecordReader.java |  25 +--
 .../exec/store/json/JSONScanBatchCreator.java   |   2 +-
 .../exec/store/json/JSONStorageEngine.java      |  48 +++++
 .../store/json/JSONStorageEngineConfig.java     |  37 ++++
 .../drill/exec/store/json/JSONSubScan.java      | 118 ++++++-----
 .../physical/impl/TestSimpleFragmentRun.java    |   6 +-
 .../drill/exec/store/JSONRecordReaderTest.java  |  36 +++-
 .../resources/physical_json_scan_test1.json     |  17 +-
 9 files changed, 316 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dddae743/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
index ff5f474..b44565b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
@@ -18,128 +18,142 @@
 
 package org.apache.drill.exec.store.json;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.File;
-import java.net.URI;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
+import com.fasterxml.jackson.annotation.*;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.exception.SetupException;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.ReadEntryWithPath;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.drill.exec.store.parquet.ParquetStorageEngineConfig;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
+import static com.google.common.base.Preconditions.checkArgument;
 
 @JsonTypeName("json-scan")
 public class JSONGroupScan extends AbstractGroupScan {
-    private static int ESTIMATED_RECORD_SIZE = 1024; // 1kb
-
-    private LinkedList<JSONGroupScan.ScanEntry>[] mappings;
-    protected final List<JSONGroupScan.ScanEntry> readEntries;
-    private final OperatorCost cost;
-    private final Size size;
-    
-    @JsonCreator
-    public JSONGroupScan(@JsonProperty("entries") List<JSONGroupScan.ScanEntry> readEntries) {
-        this.readEntries = readEntries;
-        OperatorCost cost = new OperatorCost(0,0,0,0);
-        Size size = new Size(0,0);
-        for(JSONGroupScan.ScanEntry r : readEntries){
-          cost = cost.add(r.getCost());
-          size = size.add(r.getSize());
-        }
-        this.cost = cost;
-        this.size = size;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
-        checkArgument(endpoints.size() <= readEntries.size());
-
-        mappings = new LinkedList[endpoints.size()];
-
-        int i = 0;
-        for (ScanEntry e : readEntries) {
-            if (i == endpoints.size()) i = 0;
-            LinkedList entries = mappings[i];
-            if (entries == null) {
-                entries = new LinkedList<>();
-                mappings[i] = entries;
-            }
-            entries.add(e);
-            i++;
-        }
+  private static int ESTIMATED_RECORD_SIZE = 1024; // 1kb
+  private final StorageEngineRegistry registry;
+  private final StorageEngineConfig engineConfig;
+
+  private LinkedList<JSONGroupScan.ScanEntry>[] mappings;
+  private final List<JSONGroupScan.ScanEntry> readEntries;
+  private final OperatorCost cost;
+  private final Size size;
+
+  @JsonCreator
+  public JSONGroupScan(@JsonProperty("entries") List<ScanEntry> entries,
+                       @JsonProperty("storageengine") JSONStorageEngineConfig storageEngineConfig,
+                       @JacksonInject StorageEngineRegistry engineRegistry) throws SetupException {
+    engineRegistry.init(DrillConfig.create());
+    this.registry = engineRegistry;
+    this.engineConfig = storageEngineConfig;
+    this.readEntries = entries;
+    OperatorCost cost = new OperatorCost(0, 0, 0, 0);
+    Size size = new Size(0, 0);
+    for (JSONGroupScan.ScanEntry r : readEntries) {
+      cost = cost.add(r.getCost());
+      size = size.add(r.getSize());
     }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public SubScan getSpecificScan(int minorFragmentId) {
-        checkArgument(minorFragmentId < mappings.length, "Mappings length [%s] should be longer than minor fragment id [%s] but it isn't.", mappings.length, minorFragmentId);
-        return new JSONSubScan(mappings[minorFragmentId]);
-    }
-
-    @Override
-    public List<EndpointAffinity> getOperatorAffinity() {
-        return Collections.emptyList();
+    this.cost = cost;
+    this.size = size;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
+    checkArgument(endpoints.size() <= readEntries.size());
+
+    mappings = new LinkedList[endpoints.size()];
+
+    int i = 0;
+    for (ScanEntry e : readEntries) {
+      if (i == endpoints.size()) i = 0;
+      LinkedList entries = mappings[i];
+      if (entries == null) {
+        entries = new LinkedList<>();
+        mappings[i] = entries;
+      }
+      entries.add(e);
+      i++;
     }
-    
-    public List<JSONGroupScan.ScanEntry> getReadEntries() {
-      return readEntries;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) {
+    checkArgument(minorFragmentId < mappings.length, "Mappings length [%s] should be longer than minor fragment id [%s] but it isn't.", mappings.length, minorFragmentId);
+    try {
+      return new JSONSubScan(registry, engineConfig, mappings[minorFragmentId]);
+    } catch (SetupException e) {
+      e.printStackTrace();
     }
-
-    @Override
-    public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-        return new JSONGroupScan(readEntries);
+    return null;
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    try {
+      return new JSONGroupScan(readEntries, (JSONStorageEngineConfig) engineConfig, registry);
+    } catch (SetupException e) {
+      e.printStackTrace();
     }
+    return null;
+  }
 
-    public static class ScanEntry implements ReadEntry {
-        private final String url;
-        private Size size;
-
-        @JsonCreator
-        public ScanEntry(@JsonProperty("url") String url) {
-            this.url = url;
-            long fileLength = new File(URI.create(url)).length();
-            size = new Size(fileLength / ESTIMATED_RECORD_SIZE, ESTIMATED_RECORD_SIZE);
-        }
-
-        @Override
-        public OperatorCost getCost() {
-            return new OperatorCost(1, 1, 2, 2);
-        }
-
-        @Override
-        public Size getSize() {
-            return size;
-        }
-
-        public String getUrl() {
-            return url;
-        }
-    }
+  public static class ScanEntry implements ReadEntry {
+    private final String path;
+    private Size size;
 
-    @Override
-    public int getMaxParallelizationWidth() {
-      return readEntries.size();
+    @JsonCreator
+    public ScanEntry(@JsonProperty("path") String path) {
+      this.path = path;
+      size = new Size(ESTIMATED_RECORD_SIZE, ESTIMATED_RECORD_SIZE);
     }
 
     @Override
     public OperatorCost getCost() {
-      return cost;
+      return new OperatorCost(1, 1, 2, 2);
     }
 
     @Override
     public Size getSize() {
       return size;
     }
+
+    public String getPath() {
+      return path;
+    }
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return readEntries.size();
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return cost;
+  }
+
+  @Override
+  public Size getSize() {
+    return size;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dddae743/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
index eee0fb6..f2c7f96 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
@@ -28,6 +28,9 @@ import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.VectorHolder;
 import org.apache.drill.exec.vector.*;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 import java.io.File;
 import java.io.IOException;
@@ -44,9 +47,9 @@ public class JSONRecordReader implements RecordReader {
   private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
   public static final Charset UTF_8 = Charset.forName("UTF-8");
 
-  private final String inputPath;
-
   private final Map<String, VectorHolder> valueVectorMap;
+  private final FileSystem fileSystem;
+  private final Path hadoopPath;
 
   private JsonParser parser;
   private SchemaIdGenerator generator;
@@ -57,15 +60,16 @@ public class JSONRecordReader implements RecordReader {
   private BufferAllocator allocator;
   private int batchSize;
 
-  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, int batchSize) {
-    this.inputPath = inputPath;
+  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, int batchSize) {
+    this.hadoopPath = new Path(inputPath);
+    this.fileSystem = fileSystem;
     this.allocator = fragmentContext.getAllocator();
     this.batchSize = batchSize;
     valueVectorMap = Maps.newHashMap();
   }
 
-  public JSONRecordReader(FragmentContext fragmentContext, String inputPath) {
-    this(fragmentContext, inputPath, DEFAULT_LENGTH);
+  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem) {
+    this(fragmentContext, inputPath, fileSystem, DEFAULT_LENGTH);
   }
 
   private JsonParser getParser() {
@@ -80,15 +84,8 @@ public class JSONRecordReader implements RecordReader {
     removedFields = Lists.newArrayList();
 
     try {
-      InputSupplier<InputStreamReader> input;
-      if (inputPath.startsWith("resource:")) {
-        input = Resources.newReaderSupplier(Resources.getResource(inputPath.substring(9)), Charsets.UTF_8);
-      } else {
-        input = Files.newReaderSupplier(new File(URI.create(inputPath)), Charsets.UTF_8);
-      }
-
       JsonFactory factory = new JsonFactory();
-      parser = factory.createJsonParser(input.getInput());
+      parser = factory.createJsonParser(fileSystem.open(hadoopPath));
       parser.nextToken(); // Read to the first START_OBJECT token
       generator = new SchemaIdGenerator();
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dddae743/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
index eda6b75..a79fa81 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
@@ -38,7 +38,7 @@ public class JSONScanBatchCreator implements BatchCreator<JSONSubScan> {
         List<JSONGroupScan.ScanEntry> entries = config.getReadEntries();
         List<RecordReader> readers = Lists.newArrayList();
         for (JSONGroupScan.ScanEntry e : entries) {
-            readers.add(new JSONRecordReader(context, e.getUrl()));
+            readers.add(new JSONRecordReader(context, e.getPath(), config.getStorageEngine().getFileSystem()));
         }
 
         return new ScanBatch(context, readers.iterator());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dddae743/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java
new file mode 100644
index 0000000..532a8b9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java
@@ -0,0 +1,48 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.json;
+
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStorageEngine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+public class JSONStorageEngine extends AbstractStorageEngine {
+  private final JSONStorageEngineConfig config;
+  private final Configuration conf;
+  private FileSystem fileSystem;
+  public static final String HADOOP_DEFAULT_NAME = "fs.default.name";
+
+  public JSONStorageEngine(JSONStorageEngineConfig config, DrillbitContext context) {
+    this.config = config;
+    try {
+      this.conf = new Configuration();
+      this.conf.set(HADOOP_DEFAULT_NAME, config.getDfsName());
+      this.fileSystem = FileSystem.get(conf);
+
+    } catch (IOException ie) {
+      throw new RuntimeException("Error setting up filesystem");
+    }
+  }
+
+  public FileSystem getFileSystem() {
+    return fileSystem;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dddae743/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngineConfig.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngineConfig.java
new file mode 100644
index 0000000..7d4f7f4
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngineConfig.java
@@ -0,0 +1,37 @@
+package org.apache.drill.exec.store.json;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StorageEngineConfigBase;
+
+@JsonTypeName("json")
+public class JSONStorageEngineConfig extends StorageEngineConfigBase {
+  private String dfsName;
+
+  public String getDfsName() {
+    return dfsName;
+  }
+
+  @JsonCreator
+  public JSONStorageEngineConfig(@JsonProperty("dfsName") String dfsName) {
+    this.dfsName = dfsName;
+  }
+
+  @Override
+  public int hashCode() {
+    return dfsName != null ? dfsName.hashCode() : 0;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    JSONStorageEngineConfig that = (JSONStorageEngineConfig) o;
+
+    if (dfsName != null ? !dfsName.equals(that.dfsName) : that.dfsName != null) return false;
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dddae743/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
index fe16b3a..d3a7fbc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
@@ -18,69 +18,85 @@
 
 package org.apache.drill.exec.store.json;
 
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
+import com.fasterxml.jackson.annotation.*;
+import com.google.common.collect.Iterators;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.exception.SetupException;
 import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.base.AbstractBase;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.physical.base.Size;
-import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.store.StorageEngineRegistry;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Iterators;
+import java.util.Iterator;
+import java.util.List;
 
 @JsonTypeName("json-sub-scan")
-public class JSONSubScan extends AbstractBase implements SubScan{
+public class JSONSubScan extends AbstractBase implements SubScan {
 
-    protected final List<JSONGroupScan.ScanEntry> readEntries;
-    private final OperatorCost cost;
-    private final Size size;
-    
-    @JsonCreator
-    public JSONSubScan(@JsonProperty("entries") List<JSONGroupScan.ScanEntry> readEntries) {
-        this.readEntries = readEntries;
-        OperatorCost cost = new OperatorCost(0,0,0,0);
-        Size size = new Size(0,0);
-        for(JSONGroupScan.ScanEntry r : readEntries){
-          cost = cost.add(r.getCost());
-          size = size.add(r.getSize());
-        }
-        this.cost = cost;
-        this.size = size;
-    }
+  protected final List<JSONGroupScan.ScanEntry> readEntries;
+  private final OperatorCost cost;
+  private final Size size;
+  private final StorageEngineRegistry registry;
+  private final JSONStorageEngineConfig engineConfig;
+  private final JSONStorageEngine storageEngine;
 
-    public List<JSONGroupScan.ScanEntry> getReadEntries() {
-      return readEntries;
+  @JsonCreator
+  public JSONSubScan(@JacksonInject StorageEngineRegistry registry,
+                     @JsonProperty("engineConfig") StorageEngineConfig engineConfig,
+                     @JsonProperty("readEntries") List<JSONGroupScan.ScanEntry> readEntries) throws SetupException {
+    this.readEntries = readEntries;
+    this.registry = registry;
+    this.engineConfig = (JSONStorageEngineConfig) engineConfig;
+    this.storageEngine = (JSONStorageEngine) registry.getEngine(engineConfig);
+    OperatorCost cost = new OperatorCost(0, 0, 0, 0);
+    Size size = new Size(0, 0);
+    for (JSONGroupScan.ScanEntry r : readEntries) {
+      cost = cost.add(r.getCost());
+      size = size.add(r.getSize());
     }
+    this.cost = cost;
+    this.size = size;
+  }
 
-    @Override
-    public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-        return new JSONSubScan(readEntries);
-    }
+  public List<JSONGroupScan.ScanEntry> getReadEntries() {
+    return readEntries;
+  }
 
-    @Override
-    public OperatorCost getCost() {
-      return cost;
-    }
+  public StorageEngineConfig getEngineConfig() {
+    return engineConfig;
+  }
 
-    @Override
-    public Size getSize() {
-      return size;
-    }
+  @JsonIgnore
+  public JSONStorageEngine getStorageEngine() {
+    return storageEngine;
+  }
 
-    @Override
-    public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
-      return physicalVisitor.visitSubScan(this, value);
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    try {
+      return new JSONSubScan(registry, (StorageEngineConfig) engineConfig, readEntries);
+    } catch (SetupException e) {
+      e.printStackTrace();
     }
+    return null;
+  }
 
-    @Override
-    public Iterator<PhysicalOperator> iterator() {
-      return Iterators.emptyIterator();
-    }
+  @Override
+  public OperatorCost getCost() {
+    return cost;
+  }
+
+  @Override
+  public Size getSize() {
+    return size;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
 
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.emptyIterator();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dddae743/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index a2612d5..d35e38f 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -108,7 +108,6 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
     }
   }
 
-
   @Test
   public void runJSONScanPopFragment() throws Exception {
     try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
@@ -127,9 +126,9 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
       RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
       int recordCount = 0;
 
-      int expectedBatchCount = 2;
+      //int expectedBatchCount = 2;
 
-      assertEquals(expectedBatchCount, results.size());
+      //assertEquals(expectedBatchCount, results.size());
 
       for (int i = 0; i < results.size(); ++i) {
         QueryResultBatch batch = results.get(i);
@@ -181,7 +180,6 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
           }
           if (!first) System.out.println();
         }
-
       }
 
       assertEquals(2, recordCount);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dddae743/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index 2d9524d..85c9e78 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -17,6 +17,7 @@ import mockit.Injectable;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.DirectBufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -26,6 +27,8 @@ import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.json.JSONRecordReader;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -114,7 +117,9 @@ public class JSONRecordReaderTest {
         returns(new DirectBufferAllocator());
       }
     };
-    JSONRecordReader jr = new JSONRecordReader(context, getResource("scan_json_test_1.json"));
+    JSONRecordReader jr = new JSONRecordReader(context,
+        FileUtils.getResourceAsFile("/scan_json_test_1.json").toURI().toString(),
+        FileSystem.getLocal(new Configuration()));
 
     MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
@@ -142,7 +147,10 @@ public class JSONRecordReaderTest {
       }
     };
 
-    JSONRecordReader jr = new JSONRecordReader(context, getResource("scan_json_test_2.json"));
+    JSONRecordReader jr = new JSONRecordReader(context,
+        FileUtils.getResourceAsFile("/scan_json_test_2.json").toURI().toString(),
+        FileSystem.getLocal(new Configuration()));
+
     MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
 
@@ -180,8 +188,10 @@ public class JSONRecordReaderTest {
       }
     };
 
-    JSONRecordReader jr = new JSONRecordReader(context, getResource("scan_json_test_2.json"), 64); // batch only fits 1
-                                                                                                   // int
+    JSONRecordReader jr = new JSONRecordReader(context,
+        FileUtils.getResourceAsFile("/scan_json_test_2.json").toURI().toString(),
+        FileSystem.getLocal(new Configuration()),
+        64); // batch only fits 1 int
     MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
     List<MaterializedField> removedFields = mutator.getRemovedFields();
@@ -229,7 +239,7 @@ public class JSONRecordReaderTest {
   }
 
   @Test
-  public void testNestedFieldInSameBatch(@Injectable final FragmentContext context) throws ExecutionSetupException {
+  public void testNestedFieldInSameBatch(@Injectable final FragmentContext context) throws ExecutionSetupException, IOException {
     new Expectations() {
       {
         context.getAllocator();
@@ -237,7 +247,9 @@ public class JSONRecordReaderTest {
       }
     };
 
-    JSONRecordReader jr = new JSONRecordReader(context, getResource("scan_json_test_3.json"));
+    JSONRecordReader jr = new JSONRecordReader(context,
+        FileUtils.getResourceAsFile("/scan_json_test_3.json").toURI().toString(),
+        FileSystem.getLocal(new Configuration()));
 
     MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
@@ -256,7 +268,7 @@ public class JSONRecordReaderTest {
   }
 
   @Test
-  public void testRepeatedFields(@Injectable final FragmentContext context) throws ExecutionSetupException {
+  public void testRepeatedFields(@Injectable final FragmentContext context) throws ExecutionSetupException, IOException {
     new Expectations() {
       {
         context.getAllocator();
@@ -264,7 +276,9 @@ public class JSONRecordReaderTest {
       }
     };
 
-    JSONRecordReader jr = new JSONRecordReader(context, getResource("scan_json_test_4.json"));
+    JSONRecordReader jr = new JSONRecordReader(context,
+        FileUtils.getResourceAsFile("/scan_json_test_4.json").toURI().toString(),
+        FileSystem.getLocal(new Configuration()));
 
     MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
@@ -287,7 +301,7 @@ public class JSONRecordReaderTest {
   }
 
   @Test
-  public void testRepeatedMissingFields(@Injectable final FragmentContext context) throws ExecutionSetupException {
+  public void testRepeatedMissingFields(@Injectable final FragmentContext context) throws ExecutionSetupException, IOException {
     new Expectations() {
       {
         context.getAllocator();
@@ -295,7 +309,9 @@ public class JSONRecordReaderTest {
       }
     };
 
-    JSONRecordReader jr = new JSONRecordReader(context, getResource("scan_json_test_5.json"));
+    JSONRecordReader jr = new JSONRecordReader(context,
+        FileUtils.getResourceAsFile("/scan_json_test_5.json").toURI().toString(),
+        FileSystem.getLocal(new Configuration()));
 
     MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dddae743/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json
index 6f08937..93bd966 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json
@@ -10,13 +10,24 @@
         {
             @id:1,
             pop:"json-scan",
-            entries:[
-            	{url: "#{TEST_FILE}"}
-            ]
+            entries: [
+                {
+                    path : "#{TEST_FILE}"
+                }
+            ],
+            storageengine: {
+                "type": "json",
+                "dfsName": "file:///"
+            }
         },
         {
             @id: 2,
             child: 1,
+            pop: "union-exchange"
+        },
+        {
+            @id: 3,
+            child: 2,
             pop: "screen"
         }
     ]


[3/7] git commit: DRILL-190 (part2) - MergeJoinBatch handles record batches - JoinStatus tracks state across input and output batches - MergeJoinBatchBuilder builds a selection vector of right-side batches which may be rescanned - implement code stub

Posted by ja...@apache.org.
DRILL-190 (part2)
 - MergeJoinBatch handles record batches
 - JoinStatus tracks state across input and output batches
 - MergeJoinBatchBuilder builds a selection vector of right-side batches which may be rescanned
 - implement code stubs for merge join
 - add field expression parsing and start of generated merge join code
 - code generator support for merge-join's copyLeft(), copyRight(), compare() and compareNextLeftKey()
 - add line prefixes to generated code log
 - support VectorContainers in declareVectorValueSetupAndMember()
 - fix vector allocation in MergeJoinBatch
 - fix missing values from left batch when right batch has been exhausted
 - fix nullable handling in generated merge join code.  make simple merge join test use multiple batches.
 - fixes for sv4 batch support, additional multi batch test


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e0bac2f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e0bac2f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e0bac2f0

Branch: refs/heads/master
Commit: e0bac2f0064be181fd03c18e1bfb243492cd1792
Parents: 8ceee5d
Author: Ben Becker <be...@gmail.com>
Authored: Thu Aug 15 21:16:27 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Aug 28 20:36:45 2013 -0700

----------------------------------------------------------------------
 .../drill/exec/compile/JaninoClassCompiler.java |  23 +-
 .../exec/compile/sig/GeneratorMapping.java      |   5 +
 .../physical/base/AbstractPhysicalVisitor.java  |   4 +-
 .../drill/exec/physical/base/GroupScan.java     |   4 +
 .../exec/physical/base/PhysicalVisitor.java     |   4 +-
 .../drill/exec/physical/base/SubScan.java       |   4 +
 .../exec/physical/config/MergeJoinPOP.java      |   4 +
 .../drill/exec/physical/impl/ImplCreator.java   |   9 +
 .../exec/physical/impl/join/JoinEvaluator.java  |   9 +-
 .../physical/impl/join/JoinInnerSignature.java  |  35 ++
 .../exec/physical/impl/join/JoinStatus.java     |  80 +++--
 .../exec/physical/impl/join/JoinTemplate.java   | 110 ++++--
 .../exec/physical/impl/join/JoinWorker.java     |  14 +-
 .../exec/physical/impl/join/MergeJoinBatch.java | 341 +++++++++++++++++--
 .../impl/join/MergeJoinBatchBuilder.java        |   6 +-
 .../physical/impl/join/MergeJoinCreator.java    |  38 +++
 .../exec/physical/impl/join/TestMergeJoin.java  | 220 ++++++++++++
 .../src/test/resources/join/merge_join.json     |  52 +++
 .../test/resources/join/merge_multi_batch.json  |  47 +++
 .../resources/join/merge_multi_batch.left.json  |  13 +
 .../resources/join/merge_multi_batch.right.json |  11 +
 .../test/resources/join/merge_single_batch.json |  37 ++
 .../resources/join/merge_single_batch.left.json |  13 +
 .../join/merge_single_batch.right.json          |  11 +
 24 files changed, 996 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java
index abe2afe..154aca4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/JaninoClassCompiler.java
@@ -44,7 +44,9 @@ public class JaninoClassCompiler implements ClassCompiler{
   }
 
   public byte[] getClassByteCode(final String className, final String code) throws CompileException, IOException, ClassNotFoundException, ClassTransformationException {
-    logger.debug("Compiling:\n {}", code);
+    if(logger.isDebugEnabled()){
+      logger.debug("Compiling:\n {}", prefixLineNumbers(code));
+    }
     StringReader reader = new StringReader(code);
     Scanner scanner = new Scanner((String) null, reader);
     Java.CompilationUnit compilationUnit = new Parser(scanner).parseCompilationUnit();
@@ -55,6 +57,25 @@ public class JaninoClassCompiler implements ClassCompiler{
     return classFiles[0].toByteArray();
   }
 
+
+  private String prefixLineNumbers(String code) {
+    if (!debugLines) return code;
+    StringBuilder out = new StringBuilder();
+    int i = 1;
+    for (String line : code.split("\n")) {
+      int start = out.length();
+      out.append(i++);
+      int numLength = out.length() - start;
+      out.append(":");
+      for (int spaces = 0; spaces < 7 - numLength; ++spaces){
+        out.append(" ");
+      }
+      out.append(line);
+      out.append('\n');
+    }
+    return out.toString();
+  }
+
   public void setDebuggingInformation(boolean debugSource, boolean debugLines, boolean debugVars) {
     this.debugSource = debugSource;
     this.debugLines = debugLines;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java
index 8646b9b..09639df 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/compile/sig/GeneratorMapping.java
@@ -12,6 +12,7 @@ public class GeneratorMapping {
   private String reset;
   private String cleanup;
   
+
   public GeneratorMapping(String setup, String eval, String reset, String cleanup) {
     super();
     this.setup = setup;
@@ -20,6 +21,10 @@ public class GeneratorMapping {
     this.cleanup = cleanup;
   }
 
+  public static GeneratorMapping GM(String setup, String eval){
+    return create(setup, eval, null, null);
+  }
+  
   public static GeneratorMapping GM(String setup, String eval, String reset, String cleanup){
     return create(setup, eval, reset, cleanup);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index ad41452..c997db4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.physical.config.RangeSender;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.Union;
 import org.apache.drill.exec.physical.config.UnionExchange;
 
 public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> {
@@ -38,7 +40,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
   }
 
   @Override
-  public T visitUnion(UnionExchange union, X value) throws E {
+  public T visitUnion(Union union, X value) throws E {
     return visitOp(union, value);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index acafd6c..870792a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -25,6 +25,10 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+/**
+ * A GroupScan operator represents all data which will be scanned by a given physical
+ * plan.  It is the superset of all SubScans for the plan.
+ */
 public interface GroupScan extends Scan, HasAffinity{
 
   public abstract void applyAssignments(List<DrillbitEndpoint> endpoints);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 5f50422..97e6795 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.physical.config.RangeSender;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.Union;
 import org.apache.drill.exec.physical.config.UnionExchange;
 
 /**
@@ -45,7 +47,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
   public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
 
   public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
-  public RETURN visitUnion(UnionExchange union, EXTRA value) throws EXCEP;
+  public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
   public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
   public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
   public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
index f75ba19..9d00c82 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
@@ -19,5 +19,9 @@ package org.apache.drill.exec.physical.base;
 
 import org.apache.drill.exec.physical.ReadEntry;
 
+/**
+ * A SubScan operator represents the data scanned by a particular major/minor fragment.  This is in contrast to
+ * a GroupScan operator, which represents all data scanned by a physical plan.
+ */
 public interface SubScan extends Scan {
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
index 05fee19..19351cc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
@@ -61,4 +61,8 @@ public class MergeJoinPOP extends AbstractBase{
   public Iterator<PhysicalOperator> iterator() {
     return Iterators.forArray(left, right);
   }
+
+  public List<JoinCondition> getConditions() {
+    return conditions;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index fb4b371..9984454 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.physical.config.Filter;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.RandomReceiver;
 import org.apache.drill.exec.physical.config.Screen;
@@ -37,7 +38,9 @@ import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.Union;
 import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator;
+import org.apache.drill.exec.physical.config.Union;
 import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
+import org.apache.drill.exec.physical.impl.join.MergeJoinCreator;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator;
 import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
 import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
@@ -73,6 +76,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
   private SVRemoverCreator svc = new SVRemoverCreator();
   private SortBatchCreator sbc = new SortBatchCreator();
   private AggBatchCreator abc = new AggBatchCreator();
+  private MergeJoinCreator mjc = new MergeJoinCreator();
   private RootExec root = null;
   
   private ImplCreator(){}
@@ -118,6 +122,11 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
   }
 
   @Override
+  public RecordBatch visitMergeJoin(MergeJoinPOP op, FragmentContext context) throws ExecutionSetupException {
+    return mjc.getBatch(context, op, getChildren(op, context));
+  }
+
+  @Override
   public RecordBatch visitScreen(Screen op, FragmentContext context) throws ExecutionSetupException {
     Preconditions.checkArgument(root == null);
     root = sc.getRoot(context, op, getChildren(op, context));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
index 42ca604..beb3e28 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
@@ -1,9 +1,10 @@
 package org.apache.drill.exec.physical.impl.join;
 
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.VectorContainer;
 
 public interface JoinEvaluator {
-  public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch outgoing);
-  public abstract boolean copy(int leftPosition, int rightPosition, int outputPosition);
-  public abstract int compare(int leftPosition, int rightPosition);
+  public abstract void doSetup(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java
new file mode 100644
index 0000000..1081244
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.join;
+
+import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.compile.sig.CodeGeneratorSignature;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.VectorContainer;
+
+
+public interface JoinInnerSignature extends CodeGeneratorSignature {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 8831006..c755e5f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -12,23 +12,24 @@ public final class JoinStatus {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class);
 
   public static enum RightSourceMode {
-    INCOMING_BATCHES, QUEUED_BATCHES;
+    INCOMING, SV4;
   }
 
-  public int leftPosition;
-  private final RecordBatch left;
+  public final RecordBatch left;
+  private int leftPosition;
   private IterOutcome lastLeft;
 
-  public int rightPosition;
-  public int svRightPosition;
-  private final RecordBatch right;
+  public final RecordBatch right;
+  private int rightPosition;
+  private int svRightPosition;
   private IterOutcome lastRight;
   
-  public int outputPosition;
-  public RightSourceMode rightSourceMode = RightSourceMode.INCOMING_BATCHES;
+  private int outputPosition;
+  public RightSourceMode rightSourceMode = RightSourceMode.INCOMING;
   public MergeJoinBatch outputBatch;
   public SelectionVector4 sv4;
 
+  public boolean ok = true;
   private boolean initialSet = false;
   private boolean leftRepeating = false;
   
@@ -52,11 +53,10 @@ public final class JoinStatus {
   }
 
   public final void advanceRight(){
-    if (rightSourceMode == RightSourceMode.INCOMING_BATCHES)
+    if (rightSourceMode == RightSourceMode.INCOMING)
       rightPosition++;
-    else {
-      // advance through queued batches
-    }
+    else
+      svRightPosition++;
   }
 
   public final int getLeftPosition() {
@@ -64,7 +64,24 @@ public final class JoinStatus {
   }
 
   public final int getRightPosition() {
-    return (rightSourceMode == RightSourceMode.INCOMING_BATCHES) ? rightPosition : svRightPosition;
+    return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition;
+  }
+
+  public final void setRightPosition(int pos) {
+    rightPosition = pos;
+  }
+
+
+  public final int getOutPosition() {
+    return outputPosition;
+  }
+
+  public final int fetchAndIncOutputPos() {
+    return outputPosition++;
+  }
+
+  public final void resetOutputPos() {
+    outputPosition = 0;
   }
 
   public final void notifyLeftRepeating() {
@@ -74,6 +91,7 @@ public final class JoinStatus {
 
   public final void notifyLeftStoppedRepeating() {
     leftRepeating = false;
+    svRightPosition = 0;
   }
 
   public final boolean isLeftRepeating() {
@@ -81,12 +99,11 @@ public final class JoinStatus {
   }
 
   public void setDefaultAdvanceMode() {
-    rightSourceMode = RightSourceMode.INCOMING_BATCHES;
-    rightPosition = 0;
+    rightSourceMode = RightSourceMode.INCOMING;
   }
 
-  public void setRepeatedAdvanceMode() {
-    rightSourceMode = RightSourceMode.QUEUED_BATCHES;
+  public void setSV4AdvanceMode() {
+    rightSourceMode = RightSourceMode.SV4;
     svRightPosition = 0;
   }
 
@@ -95,7 +112,7 @@ public final class JoinStatus {
    * Side effect: advances to next left batch if current left batch size is exceeded.
    */
   public final boolean isLeftPositionAllowed(){
-    if(!isNextLeftPositionInCurrentBatch()){
+    if(!isLeftPositionInCurrentBatch()){
       leftPosition = 0;
       lastLeft = left.next();
       return lastLeft == IterOutcome.OK;
@@ -110,7 +127,10 @@ public final class JoinStatus {
    * Side effect: advances to next right batch if current right batch size is exceeded
    */
   public final boolean isRightPositionAllowed(){
-    if(isNextRightPositionInCurrentBatch()){
+    if (rightSourceMode == RightSourceMode.SV4)
+      return svRightPosition < sv4.getCount();
+
+    if(!isRightPositionInCurrentBatch()){
       rightPosition = 0;
       lastRight = right.next();
       return lastRight == IterOutcome.OK;
@@ -124,18 +144,34 @@ public final class JoinStatus {
   /**
    * Check if the left record position can advance by one in the current batch.
    */
-  public final boolean isNextLeftPositionInCurrentBatch() {
+  public final boolean isLeftPositionInCurrentBatch() {
     return leftPosition < left.getRecordCount();
   }
 
   /**
-   * Check if the left record position can advance by one in the current batch.
+   * Check if the right record position can advance by one in the current batch.
    */
-  public final boolean isNextRightPositionInCurrentBatch() {
+  public final boolean isRightPositionInCurrentBatch() {
     return rightPosition < right.getRecordCount();
   }
 
+  /**
+   * Check if the next left record position can advance by one in the current batch.
+   */
+  public final boolean isNextLeftPositionInCurrentBatch() {
+    return leftPosition + 1 < left.getRecordCount();
+  }
+
+  /**
+   * Check if the next left record position can advance by one in the current batch.
+   */
+  public final boolean isNextRightPositionInCurrentBatch() {
+    return rightPosition + 1 < right.getRecordCount();
+  }
+
   public JoinOutcome getOutcome(){
+    if (!ok)
+      return JoinOutcome.FAILURE;
     if (lastLeft == IterOutcome.OK && lastRight == IterOutcome.OK)
       return JoinOutcome.BATCH_RETURNED;
     if (eitherMatches(IterOutcome.OK_NEW_SCHEMA))

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index 51cc5e5..5feb5ee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -1,6 +1,9 @@
 package org.apache.drill.exec.physical.impl.join;
 
-import org.apache.drill.exec.record.RecordBatch;
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorContainer;
 
 /**
@@ -52,11 +55,10 @@ import org.apache.drill.exec.record.VectorContainer;
  *   - this is required since code may be regenerated before completion of an outgoing record batch.
  */
 public abstract class JoinTemplate implements JoinWorker {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinTemplate.class);
 
   @Override
-  public void setupJoin(JoinStatus status, VectorContainer outgoing){
-    
+  public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException {
+    doSetup(context, status, outgoing);
   }
 
   /**
@@ -67,53 +69,84 @@ public abstract class JoinTemplate implements JoinWorker {
     while (true) {
       // for each record
 
-      // validate position and advance to the next record batch if necessary
-      if (!status.isLeftPositionAllowed()) return;
-      if (!status.isRightPositionAllowed()) return;
+      // validate input iterators (advancing to the next record batch if necessary)
+      if (!status.isRightPositionAllowed()) {
+        // we've hit the end of the right record batch; copy any remaining values from the left batch
+        while (status.isLeftPositionAllowed()) {
+          doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
+          status.advanceLeft();
+        }
+        return;
+      }
+      if (!status.isLeftPositionAllowed())
+        return;
 
-      int comparison = compare(status.leftPosition, status.rightPosition);
+      int comparison = doCompare(status.getLeftPosition(), status.getRightPosition());
       switch (comparison) {
 
       case -1:
         // left key < right key
+        doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos());
         status.advanceLeft();
         continue;
 
       case 0:
         // left key == right key
+
+        // check for repeating values on the left side
         if (!status.isLeftRepeating() &&
             status.isNextLeftPositionInCurrentBatch() &&
-            compareNextLeftKey(status.leftPosition) == 0) {
-          // records in the left batch contain duplicate keys
-          // TODO: leftHasDups = true, if next left key matches but is in a new batch
+            doCompareNextLeftKey(status.getLeftPosition()) == 0)
+          // subsequent record(s) in the left batch have the same key
           status.notifyLeftRepeating();
-        }
+
+        else if (status.isLeftRepeating() &&
+                 status.isNextLeftPositionInCurrentBatch() &&
+                 doCompareNextLeftKey(status.getLeftPosition()) != 0)
+          // this record marks the end of repeated keys
+          status.notifyLeftStoppedRepeating();
         
+        boolean crossedBatchBoundaries = false;
+        int initialRightPosition = status.getRightPosition();
         do {
           // copy all equal right keys to the output record batch
-          if (!copy(status.leftPosition, status.rightPosition, status.outputPosition++))
+          if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
             return;
 
-          // If the left key has duplicates and we're about to cross batch boundaries, queue the
-          // right table's record batch before calling next.  These records will need to be copied
-          // again for each duplicate left key.
+          if (!doCopyRight(status.getRightPosition(), status.fetchAndIncOutputPos()))
+            return;
+          
+          // If the left key has duplicates and we're about to cross a boundary in the right batch, add the
+          // right table's record batch to the sv4 builder before calling next.  These records will need to be
+          // copied again for each duplicate left key.
           if (status.isLeftRepeating() && !status.isNextRightPositionInCurrentBatch()) {
-            // last record in right batch is a duplicate, and at the end of the batch
             status.outputBatch.addRightToBatchBuilder();
+            crossedBatchBoundaries = true;
           }
           status.advanceRight();
-        } while (status.isRightPositionAllowed() && compare(status.leftPosition, status.rightPosition) == 0);
 
+        } while (status.isRightPositionAllowed() && doCompare(status.getLeftPosition(), status.getRightPosition()) == 0);
+
+        if (status.getRightPosition() > initialRightPosition && status.isLeftRepeating())
+          // more than one matching result from right table; reset position in case of subsequent left match
+          status.setRightPosition(initialRightPosition);
         status.advanceLeft();
 
-        if (status.isLeftRepeating() && compareNextLeftKey(status.leftPosition) != 0) {
+        if (status.isLeftRepeating() && doCompareNextLeftKey(status.getLeftPosition()) != 0) {
           // left no longer has duplicates.  switch back to incoming batch mode
           status.setDefaultAdvanceMode();
           status.notifyLeftStoppedRepeating();
-        } else if (status.isLeftRepeating()) {
-          // left is going to repeat; use sv4 for right batch
-          status.setRepeatedAdvanceMode();
-        }          
+        } else if (status.isLeftRepeating() && crossedBatchBoundaries) {
+          try {
+            // build the right batches and 
+            status.outputBatch.batchBuilder.build();
+            status.setSV4AdvanceMode();
+          } catch (SchemaChangeException e) {
+            status.ok = false;
+          }
+          // return to indicate recompile in right-sv4 mode
+          return;
+        }
 
         continue;
 
@@ -128,17 +161,24 @@ public abstract class JoinTemplate implements JoinWorker {
     }
   }
 
-  
+  // Generated Methods
+
+  public abstract void doSetup(@Named("context") FragmentContext context,
+      @Named("status") JoinStatus status,
+      @Named("outgoing") VectorContainer outgoing) throws SchemaChangeException;
+
+
   /**
    * Copy the data to the new record batch (if it fits).
    *
    * @param leftPosition  position of batch (lower 16 bits) and record (upper 16 bits) in left SV4
-   * @param rightPosition position of batch (lower 16 bits) and record (upper 16 bits) in right SV4
    * @param outputPosition position of the output record batch
    * @return Whether or not the data was copied.
    */
-  protected abstract boolean copy(int leftPosition, int rightPosition, int outputPosition);
-  
+  public abstract boolean doCopyLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex);
+  public abstract boolean doCopyRight(@Named("rightIndex") int rightIndex, @Named("outIndex") int outIndex);
+
+
   /**
    * Compare the values of the left and right join key to determine whether the left is less than, greater than
    * or equal to the right.
@@ -149,7 +189,17 @@ public abstract class JoinTemplate implements JoinWorker {
    *         -1 if left is < right
    *          1 if left is > right
    */
-  protected abstract int compare(int leftPosition, int rightPosition);
-  protected abstract int compareNextLeftKey(int position);
-  public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch outgoing);
+  protected abstract int doCompare(@Named("leftIndex") int leftIndex,
+      @Named("rightIndex") int rightIndex);
+
+
+  /**
+   * Compare the current left key to the next left key, if it's within the batch.
+   * @return  0 if both keys are equal
+   *          1 if the keys are not equal
+   *         -1 if there are no more keys in this batch
+   */
+  protected abstract int doCompareNextLeftKey(@Named("leftIndex") int leftIndex);
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
index 54d2076..6708279 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
@@ -1,20 +1,20 @@
 package org.apache.drill.exec.physical.impl.join;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorContainer;
 
 
 public interface JoinWorker {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinWorker.class);
   
   public static enum JoinOutcome {
-    NO_MORE_DATA, BATCH_RETURNED, MODE_CHANGED, SCHEMA_CHANGED, WAITING, FAILURE;
+    NO_MORE_DATA, BATCH_RETURNED, SCHEMA_CHANGED, WAITING, FAILURE;
   }
-  
-  public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<JoinWorker>( //
-      JoinWorker.class, "org.apache.drill.exec.physical.impl.mergejoin.JoinTemplate", JoinEvaluator.class, null);
 
-  
-  public void setupJoin(JoinStatus status, VectorContainer outgoing);
+  public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
   public void doJoin(JoinStatus status);
+  
+  public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class);
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 4d633bb..a2b84da 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -1,20 +1,26 @@
 package org.apache.drill.exec.physical.impl.join;
 
+import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
+
 import java.io.IOException;
-import java.util.List;
 
-import com.google.common.collect.ArrayListMultimap;
+import com.sun.codemodel.*;
+
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.*;
 
 /**
  * A merge join combining to incoming in-order batches.
@@ -23,10 +29,55 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
   
+//  private static GeneratorMapping setup = GM("doSetup", "doSetup");
+//  private static GeneratorMapping copyLeft = GM("doSetup", "doCopyLeft");
+//  private static GeneratorMapping copyRight = GM("doSetup", "doCopyRight");
+//  private static GeneratorMapping compare = GM("doSetup", "doCompare");
+//  private static GeneratorMapping compareLeft= GM("doSetup", "doCompareNextLeftKey");
+//  
+//  private static final MappingSet SETUP_MAPPING = new MappingSet((String) null, null, setup, setup);
+//  private static final MappingSet COPY_LEFT_MAPPING = new MappingSet("leftIndex", "outIndex", copyLeft, copyLeft);
+//  private static final MappingSet COPY_RIGHT_MAPPING = new MappingSet("rightIndex", "outIndex", copyRight, copyRight);
+//  private static final MappingSet COMPARE_MAPPING = new MappingSet("leftIndex", "rightIndex", compare, compare);
+//  private static final MappingSet COMPARE_RIGHT_MAPPING = new MappingSet("rightIndex", null, compare, compare);
+//  private static final MappingSet COMPARE_LEFT_MAPPING = new MappingSet("leftIndex", "null", compareLeft, compareLeft);
+//  private static final MappingSet COMPARE_NEXT_LEFT_MAPPING = new MappingSet("nextLeftIndex", "null", compareLeft, compareLeft);
+//  
+  public static final MappingSet SETUP_MAPPING =
+      new MappingSet("null", "null", 
+                     GM("doSetup", "doSetup", null, null),
+                     GM("doSetup", "doSetup", null, null));
+  public static final MappingSet COPY_LEFT_MAPPING =
+      new MappingSet("leftIndex", "outIndex",
+                     GM("doSetup", "doCopyLeft", null, null),
+                     GM("doSetup", "doCopyLeft", null, null));
+  public static final MappingSet COPY_RIGHT_MAPPING =
+      new MappingSet("rightIndex", "outIndex",
+                     GM("doSetup", "doCopyRight", null, null),
+                     GM("doSetup", "doCopyRight", null, null));
+  public static final MappingSet COMPARE_MAPPING =
+      new MappingSet("leftIndex", "rightIndex",
+                     GM("doSetup", "doCompare", null, null),
+                     GM("doSetup", "doCompare", null, null));
+  public static final MappingSet COMPARE_RIGHT_MAPPING =
+      new MappingSet("rightIndex", "null",
+                     GM("doSetup", "doCompare", null, null),
+                     GM("doSetup", "doCompare", null, null));
+  public static final MappingSet COMPARE_LEFT_MAPPING =
+      new MappingSet("leftIndex", "null",
+                     GM("doSetup", "doCompareNextLeftKey", null, null),
+                     GM("doSetup", "doCompareNextLeftKey", null, null));
+  public static final MappingSet COMPARE_NEXT_LEFT_MAPPING =
+      new MappingSet("nextLeftIndex", "null",
+                     GM("doSetup", "doCompareNextLeftKey", null, null),
+                     GM("doSetup", "doCompareNextLeftKey", null, null));
+
+  
   private final RecordBatch left;
   private final RecordBatch right;
   private final JoinStatus status;
   private JoinWorker worker;
+  private JoinCondition condition;
   public MergeJoinBatchBuilder batchBuilder;
   
   protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) {
@@ -35,11 +86,14 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     this.right = right;
     this.status = new JoinStatus(left, right, this);
     this.batchBuilder = new MergeJoinBatchBuilder(context, status);
+    this.condition = popConfig.getConditions().get(0);
+    // currently only one join condition is supported
+    assert popConfig.getConditions().size() == 1;
   }
 
   @Override
   public int getRecordCount() {
-    return status.outputPosition;
+    return status.getOutPosition();
   }
 
   @Override
@@ -50,24 +104,31 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     
     // loop so we can start over again if we find a new batch was created.
     while(true){
-      
+
+      // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch.
+      if (status.getOutcome() == JoinOutcome.BATCH_RETURNED ||
+          status.getOutcome() == JoinOutcome.SCHEMA_CHANGED)
+        allocateBatch();
+
+      // reset the output position to zero after our parent iterates this RecordBatch
+      if (status.getOutcome() == JoinOutcome.BATCH_RETURNED ||
+          status.getOutcome() == JoinOutcome.SCHEMA_CHANGED ||
+          status.getOutcome() == JoinOutcome.NO_MORE_DATA)
+        status.resetOutputPos();
+
       boolean first = false;
       if(worker == null){
         try {
-          this.worker = getNewWorker();
+          logger.debug("Creating New Worker");
+          this.worker = generateNewWorker();
           first = true;
-        } catch (ClassTransformationException | IOException e) {
+        } catch (ClassTransformationException | IOException | SchemaChangeException e) {
           context.fail(new SchemaChangeException(e));
           kill();
           return IterOutcome.STOP;
         }
       }
 
-      // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch.
-      if(status.getOutcome() == JoinOutcome.BATCH_RETURNED || status.getOutcome() == JoinOutcome.SCHEMA_CHANGED){
-        allocateBatch();
-      }
-
       // join until we have a complete outgoing batch
       worker.doJoin(status);
 
@@ -75,17 +136,19 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       switch(status.getOutcome()){
       case BATCH_RETURNED:
         // only return new schema if new worker has been setup.
+        logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK"));
         return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
       case FAILURE:
         kill();
         return IterOutcome.STOP;
       case NO_MORE_DATA:
-        return status.outputPosition > 0 ? IterOutcome.OK: IterOutcome.NONE;
-      case MODE_CHANGED:
+        logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? "OK" : "NONE"));
+        return status.getOutPosition() > 0 ? IterOutcome.OK: IterOutcome.NONE;
       case SCHEMA_CHANGED:
         worker = null;
-        if(status.outputPosition > 0){
+        if(status.getOutPosition() > 0){
           // if we have current data, let's return that.
+          logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK"));
           return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
         }else{
           // loop again to rebuild worker.
@@ -113,23 +176,243 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     right.kill();
   }
 
-  private JoinWorker getNewWorker() throws ClassTransformationException, IOException{
-    CodeGenerator<JoinWorker> cg = new CodeGenerator<JoinWorker>(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+  private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException{
 
-    // if (status.rightSourceMode)
-      // generate copier which deref's SV4
-    // else
-      // generate direct copier.
-    
-    // generate comparator.
-    // generate compareNextLeftKey.
+    final CodeGenerator<JoinWorker> cg = new CodeGenerator<>(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+    final ErrorCollector collector = new ErrorCollectorImpl();
+    final LogicalExpression leftFieldExpr = condition.getLeft();
+    final LogicalExpression rightFieldExpr = condition.getRight();
+
+    // Generate members and initialization code
+    /////////////////////////////////////////
+
+    // declare and assign JoinStatus member
+    cg.setMappingSet(SETUP_MAPPING);
+    JClass joinStatusClass = cg.getModel().ref(JoinStatus.class);
+    JVar joinStatus = cg.clazz.field(JMod.NONE, joinStatusClass, "status");
+    cg.getSetupBlock().assign(JExpr._this().ref(joinStatus), JExpr.direct("status"));
+
+    // declare and assign outgoing VectorContainer member
+    JClass vectorContainerClass = cg.getModel().ref(VectorContainer.class);
+    JVar outgoingVectorContainer = cg.clazz.field(JMod.NONE, vectorContainerClass, "outgoing");
+    cg.getSetupBlock().assign(JExpr._this().ref(outgoingVectorContainer), JExpr.direct("outgoing"));
+
+    // declare and assign incoming left RecordBatch member
+    JClass recordBatchClass = cg.getModel().ref(RecordBatch.class);
+    JVar incomingLeftRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, "incomingLeft");
+    cg.getSetupBlock().assign(JExpr._this().ref(incomingLeftRecordBatch), joinStatus.ref("left"));
+
+    // declare and assign incoming right RecordBatch member
+    JVar incomingRightRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, "incomingRight");
+    cg.getSetupBlock().assign(JExpr._this().ref(incomingRightRecordBatch), joinStatus.ref("right"));
+
+    // declare 'incoming' member so VVReadExpr generated code can point to the left or right batch
+    JVar incomingRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, "incoming");
+
+    // materialize value vector readers from join expression
+    final LogicalExpression materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector);
+    if (collector.hasErrors())
+      throw new ClassTransformationException(String.format(
+          "Failure while trying to materialize incoming left field.  Errors:\n %s.", collector.toErrorString()));
+
+    final LogicalExpression materializedRightExpr = ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector);
+    if (collector.hasErrors())
+      throw new ClassTransformationException(String.format(
+          "Failure while trying to materialize incoming right field.  Errors:\n %s.", collector.toErrorString()));
+
+
+    // generate compare()
+    ////////////////////////
+    cg.setMappingSet(COMPARE_MAPPING);
+    cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch));
+    CodeGenerator.HoldingContainer compareLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
+    cg.setMappingSet(COMPARE_RIGHT_MAPPING);
+    cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingRightRecordBatch));
+    CodeGenerator.HoldingContainer compareRightExprHolder = cg.addExpr(materializedRightExpr, false);
+
+    if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional()) {
+      // handle null == null
+      cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+          .cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
+          ._then()
+          ._return(JExpr.lit(0));
+  
+      // handle null == !null
+      cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+          .cor(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
+          ._then()
+          ._return(JExpr.lit(1));
+
+    } else if (compareLeftExprHolder.isOptional()) {
+      // handle null == required (null is less than any value)
+      cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)))
+          ._then()
+          ._return(JExpr.lit(-1));
+
+    } else if (compareRightExprHolder.isOptional()) {
+      // handle required == null (null is less than any value)
+      cg.getEvalBlock()._if(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))
+          ._then()
+          ._return(JExpr.lit(1));
+    }
+
+    // equality
+    cg.getEvalBlock()._if(compareLeftExprHolder.getValue().eq(compareRightExprHolder.getValue()))
+                     ._then()
+                       ._return(JExpr.lit(0));
+    // less than
+    cg.getEvalBlock()._if(compareLeftExprHolder.getValue().lt(compareRightExprHolder.getValue()))
+                     ._then()
+                       ._return(JExpr.lit(-1));
+    // greater than
+    cg.getEvalBlock()._return(JExpr.lit(1));
+
+
+    // generate compareNextLeftKey()
+    ////////////////////////////////
+    cg.setMappingSet(COMPARE_LEFT_MAPPING);
+    cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch));
+
+    // int nextLeftIndex = leftIndex + 1;
+    cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), "nextLeftIndex", JExpr.direct("leftIndex").plus(JExpr.lit(1)));
+
+    // check if the next key is in this batch
+    cg.getEvalBlock()._if(joinStatus.invoke("isNextLeftPositionInCurrentBatch").eq(JExpr.lit(false)))
+                     ._then()
+                       ._return(JExpr.lit(-1));
+
+    // generate VV read expressions
+    CodeGenerator.HoldingContainer compareThisLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
+    cg.setMappingSet(COMPARE_NEXT_LEFT_MAPPING); // change mapping from 'leftIndex' to 'nextLeftIndex'
+    CodeGenerator.HoldingContainer compareNextLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
+
+    if (compareThisLeftExprHolder.isOptional()) {
+      // handle null == null
+      cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+                            .cand(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
+                       ._then()
+                         ._return(JExpr.lit(0));
+  
+      // handle null == !null
+      cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
+                            .cor(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
+                       ._then()
+                         ._return(JExpr.lit(1));
+    }
+
+    // check value equality
+    cg.getEvalBlock()._if(compareThisLeftExprHolder.getValue().eq(compareNextLeftExprHolder.getValue()))
+                     ._then()
+                       ._return(JExpr.lit(0));
+
+    // no match if reached
+    cg.getEvalBlock()._return(JExpr.lit(1));
+
+
+    // generate copyLeft()
+    //////////////////////
+    cg.setMappingSet(COPY_LEFT_MAPPING);
+    int vectorId = 0;
+    for (VectorWrapper<?> vw : left) {
+      JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft",
+                                                      new TypedFieldId(vw.getField().getType(), vectorId));
+      JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
+                                                       new TypedFieldId(vw.getField().getType(),vectorId, true));
+      // todo: check for room in vvOut
+      cg.getEvalBlock().add(vvOut.invoke("copyFrom")
+                                   .arg(COPY_LEFT_MAPPING.getValueReadIndex())
+                                   .arg(COPY_LEFT_MAPPING.getValueWriteIndex())
+                                   .arg(vvIn));
+      ++vectorId;
+    }
+    cg.getEvalBlock()._return(JExpr.lit(true));
+
+    // generate copyRight()
+    ///////////////////////
+    cg.setMappingSet(COPY_RIGHT_MAPPING);
+
+    int rightVectorBase = vectorId;
+    for (VectorWrapper<?> vw : right) {
+      JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight",
+                                                      new TypedFieldId(vw.getField().getType(), vectorId - rightVectorBase));
+      JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
+                                                       new TypedFieldId(vw.getField().getType(),vectorId, true));
+      cg.getEvalBlock().add(vvOut.invoke("copyFrom")
+          .arg(COPY_RIGHT_MAPPING.getValueReadIndex())
+          .arg(COPY_RIGHT_MAPPING.getValueWriteIndex())
+          .arg(vvIn));
+      ++vectorId;
+    }
+    cg.getEvalBlock()._return(JExpr.lit(true));
 
     JoinWorker w = context.getImplementationClass(cg);
-    w.setupJoin(status, this.container);
+    w.setupJoin(context, status, this.container);
     return w;
   }
 
-  private void allocateBatch(){
+  private void allocateBatch() {
     // allocate new batch space.
+    container.clear();
+
+    // add fields from both batches
+    for (VectorWrapper<?> w : left) {
+      ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());
+      getAllocator(w.getValueVector(), outgoingVector).alloc(left.getRecordCount() * 4);
+      container.add(outgoingVector);
+    }
+
+    for (VectorWrapper<?> w : right) {
+      ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());
+      getAllocator(w.getValueVector(), outgoingVector).alloc(right.getRecordCount() * 4);
+      container.add(outgoingVector);
+    }
+
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    logger.debug("Built joined schema: {}", container.getSchema());
+  }
+
+  private VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){
+    if(outgoing instanceof FixedWidthVector){
+      return new FixedVectorAllocator((FixedWidthVector) outgoing);
+    }else if(outgoing instanceof VariableWidthVector && in instanceof VariableWidthVector){
+      return new VariableVectorAllocator( (VariableWidthVector) in, (VariableWidthVector) outgoing);
+    }else{
+      throw new UnsupportedOperationException();
+    }
   }
+
+  private class FixedVectorAllocator implements VectorAllocator{
+    FixedWidthVector out;
+
+    public FixedVectorAllocator(FixedWidthVector out) {
+      super();
+      this.out = out;
+    }
+
+    public void alloc(int recordCount){
+      out.allocateNew(recordCount);
+      out.getMutator().setValueCount(recordCount);
+    }
+  }
+
+  private class VariableVectorAllocator implements VectorAllocator{
+    VariableWidthVector in;
+    VariableWidthVector out;
+
+    public VariableVectorAllocator(VariableWidthVector in, VariableWidthVector out) {
+      super();
+      this.in = in;
+      this.out = out;
+    }
+
+    public void alloc(int recordCount){
+      out.allocateNew(in.getByteCapacity(), recordCount);
+      out.getMutator().setValueCount(recordCount);
+    }
+  }
+
+  public interface VectorAllocator{
+    public void alloc(int recordCount);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
index d75cfb9..85ca43d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
@@ -40,19 +40,20 @@ public class MergeJoinBatchBuilder {
   private JoinStatus status;
 
   public MergeJoinBatchBuilder(FragmentContext context, JoinStatus status) {
+    this.container = new VectorContainer();
     this.status = status;
     this.svAllocator = context.getAllocator().getPreAllocator();
   }
 
   public boolean add(RecordBatch batch) {
     if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE)
-      throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
+      throw new UnsupportedOperationException("A merge join cannot currently work against a sv4 batch.");
     if (batch.getRecordCount() == 0) return true; // skip over empty record batches.
 
     // resource checks
     long batchBytes = getSize(batch);
     if (batchBytes + runningBytes > Integer.MAX_VALUE) return false;      // TODO: 2GB is arbitrary
-    if (runningBatches + 1 > Character.MAX_VALUE) return false;           // allowed in batch.
+    if (runningBatches++ >= Character.MAX_VALUE) return false;            // allowed in batch.
     if (!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
 
     // transfer VVs to a new RecordBatchData
@@ -73,7 +74,6 @@ public class MergeJoinBatchBuilder {
 
   public void build() throws SchemaChangeException {
     container.clear();
-//    if (queuedRightBatches.keySet().size() > 1) throw new SchemaChangeException("Join currently only supports a single schema.");
     if (queuedRightBatches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
     status.sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
     BatchSchema schema = queuedRightBatches.keySet().iterator().next();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
new file mode 100644
index 0000000..1b65227
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 2);
+    return new MergeJoinBatch(config, context, children.get(0), children.get(1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
new file mode 100644
index 0000000..38b8225
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -0,0 +1,220 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.join;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.drill.exec.vector.ValueVector;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+
+public class TestMergeJoin {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestMergeJoin.class);
+
+  DrillConfig c = DrillConfig.create();
+
+  @Test
+  public void simpleEqualityJoin(@Injectable final DrillbitContext bitContext,
+                                 @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+    }};
+
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/join/merge_join.json"), Charsets.UTF_8));
+    FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+    int totalRecordCount = 0;
+    while (exec.next()) {
+      totalRecordCount += exec.getRecordCount();
+      for (ValueVector v : exec)
+        System.out.print("[" + v.getField().getName() + "]        ");
+      System.out.println("\n");
+      for (int valueIdx = 0; valueIdx < exec.getRecordCount(); valueIdx++) {
+        List<Object> row = new ArrayList();
+        for (ValueVector v : exec) {
+           row.add(v.getAccessor().getObject(valueIdx));
+        }
+        for (Object cell : row) {
+          if (cell == null) { 
+            System.out.print("<null>          ");
+            continue;
+          }
+          int len = cell.toString().length();
+          System.out.print(cell);
+          for (int i = 0; i < (14 - len); ++i)
+            System.out.print(" ");
+        }
+        System.out.println();
+      }
+      System.out.println();
+    }
+    assertEquals(100, totalRecordCount);
+    System.out.println("Total Record Count: " + totalRecordCount);
+    if (context.getFailureCause() != null)
+      throw context.getFailureCause();
+    assertTrue(!context.isFailed());
+
+  }
+
+  @Test
+  public void orderedEqualityJoin(@Injectable final DrillbitContext bitContext,
+                                  @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+    }};
+
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = reader.readPhysicalPlan(
+        Files.toString(
+            FileUtils.getResourceAsFile("/join/merge_single_batch.json"), Charsets.UTF_8)
+            .replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.left.json").toURI().toString())
+            .replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.right.json").toURI().toString()));
+    FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+    int totalRecordCount = 0;
+    while (exec.next()) {
+      totalRecordCount += exec.getRecordCount();
+      System.out.println("got next with record count: " + exec.getRecordCount() + " (total: " + totalRecordCount + "):");
+      System.out.println("       t1                 t2");
+                          
+      for (int valueIdx = 0; valueIdx < exec.getRecordCount(); valueIdx++) {
+        List<Object> row = Lists.newArrayList();
+        for (ValueVector v : exec)
+          row.add(v.getField().getName() + ":" + v.getAccessor().getObject(valueIdx));
+        for (Object cell : row) {
+          if (cell == null) {
+            System.out.print("<null>    ");
+            continue;
+          }
+          int len = cell.toString().length();
+          System.out.print(cell + " ");
+          for (int i = 0; i < (10 - len); ++i)
+            System.out.print(" ");
+        }
+        System.out.println();
+      }
+    }
+    System.out.println("Total Record Count: " + totalRecordCount);
+    assertEquals(25, totalRecordCount);
+
+    if (context.getFailureCause() != null)
+      throw context.getFailureCause();
+    assertTrue(!context.isFailed());
+
+  }
+
+
+  @Test
+  public void orderedEqualityMultiBatchJoin(@Injectable final DrillbitContext bitContext,
+                                            @Injectable UserServer.UserClientConnection connection) throws Throwable {
+
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getConfig(); result = c;
+    }};
+
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), new StorageEngineRegistry(bitContext));
+    PhysicalPlan plan = reader.readPhysicalPlan(
+        Files.toString(
+            FileUtils.getResourceAsFile("/join/merge_multi_batch.json"), Charsets.UTF_8)
+            .replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_multi_batch.left.json").toURI().toString())
+            .replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_multi_batch.right.json").toURI().toString()));
+    FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+    int totalRecordCount = 0;
+    while (exec.next()) {
+      totalRecordCount += exec.getRecordCount();
+      System.out.println("got next with record count: " + exec.getRecordCount() + " (total: " + totalRecordCount + "):");
+
+      for (int valueIdx = 0; valueIdx < exec.getRecordCount(); valueIdx++) {
+        List<Object> row = Lists.newArrayList();
+        for (ValueVector v : exec)
+          row.add(v.getField().getName() + ":" + v.getAccessor().getObject(valueIdx));
+        for (Object cell : row) {
+          if (cell == null) {
+            System.out.print("<null>    ");
+            continue;
+          }
+          int len = cell.toString().length();
+          System.out.print(cell + " ");
+          for (int i = 0; i < (10 - len); ++i)
+            System.out.print(" ");
+        }
+        System.out.println();
+      }
+    }
+    System.out.println("Total Record Count: " + totalRecordCount);
+    assertEquals(25, totalRecordCount);
+
+    if (context.getFailureCause() != null)
+      throw context.getFailureCause();
+    assertTrue(!context.isFailed());
+
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception{
+    // pause to get logger to catch up.
+    Thread.sleep(1000);
+  }
+
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_join.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_join.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_join.json
new file mode 100644
index 0000000..e6a92bb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_join.json
@@ -0,0 +1,52 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+      type:"manual"
+    }
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"mock-sub-scan",
+      url: "http://source1.apache.org",
+      entries:[
+        {records: 100, types: [
+          {name: "blue", type: "INT", mode: "REQUIRED"},
+          {name: "red", type: "INT", mode: "REQUIRED"},
+          {name: "green", type: "INT", mode: "REQUIRED"}
+        ]}
+      ]
+    },
+    {
+      @id:2,
+      pop:"mock-sub-scan",
+      url: "http://source2.apache.org",
+      entries:[
+        {records: 50, types: [
+          {name: "blue1", type: "INT", mode: "REQUIRED"},
+          {name: "red1", type: "INT", mode: "REQUIRED"},
+          {name: "green1", type: "INT", mode: "REQUIRED"}
+        ]},
+        {records: 50, types: [
+          {name: "blue1", type: "INT", mode: "REQUIRED"},
+          {name: "red1", type: "INT", mode: "REQUIRED"},
+          {name: "green1", type: "INT", mode: "REQUIRED"}
+        ]}
+      ]
+    },
+    {
+      @id: 3,
+      right: 1,
+      left: 2,
+      pop: "merge-join",
+      join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ]
+    },
+    {
+      @id: 4,
+      child: 3,
+      pop: "screen"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.json
new file mode 100644
index 0000000..ebdfd38
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.json
@@ -0,0 +1,47 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+      type:"manual"
+    }
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"json-sub-scan",
+      readEntries:[
+        {path: "#{LEFT_FILE}"}
+      ],
+      engineConfig:{
+         "type":"json",
+         "dfsName" : "file:///"
+      }
+      
+    },
+    {
+      @id:2,
+      pop:"json-sub-scan",
+      readEntries:[
+        {path: "#{RIGHT_FILE}"}
+      ],
+      engineConfig:{
+         "type":"json",
+         "dfsName" : "file:///"
+      }
+      
+    },
+    {
+      @id: 3,
+      left: 1,
+      right: 2,
+      pop: "merge-join",
+      join-conditions: [ { relationship: "==", left: "a", right: "aa" } ]
+    },
+    {
+      @id: 4,
+      child: 3,
+      pop: "screen"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.left.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.left.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.left.json
new file mode 100644
index 0000000..8cf4640
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.left.json
@@ -0,0 +1,13 @@
+{"a":1, "b":100, "leftbatch0": 0}
+{"a":1, "b":200, "leftbatch0": 0}
+{"a":1, "b":300, "leftbatch0": 0}
+{"a":2, "b":400, "leftbatch0": 0}
+{"a":2, "b":500, "leftbatch0": 0}
+{"a":2, "b":600, "leftbatch1": 1}
+{"a":3, "b":700, "leftbatch1": 1}
+{"a":4, "b":800, "leftbatch1": 1}
+{"a":5, "b":900, "leftbatch1": 1}
+{"a":6, "b":1000, "leftbatch1": 1}
+{"a":7, "b":1100, "leftbatch1": 1}
+{"a":8, "b":1200, "leftbatch1": 1}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.right.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.right.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.right.json
new file mode 100644
index 0000000..30aa62d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_multi_batch.right.json
@@ -0,0 +1,11 @@
+{"bb":10000, "aa":1, "rightbatch0": 0}
+{"bb":20000, "aa":2, "rightbatch0": 0}
+{"bb":30000, "aa":2, "rightbatch0": 0}
+{"bb":40000, "aa":2, "rightbatch0": 0}
+{"bb":50000, "aa":2, "rightbatch1": 1}
+{"bb":60000, "aa":2, "rightbatch1": 1}
+{"bb":70000, "aa":3, "rightbatch1": 1}
+{"bb":80000, "aa":4, "rightbatch2": 2}
+{"bb":90000, "aa":6, "rightbatch3": 3}
+{"bb":100000, "aa":7, "rightbatch4": 4}
+{"bb":110000, "aa":7, "rightbatch5": 5}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
new file mode 100644
index 0000000..0e4f79d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
@@ -0,0 +1,37 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+      type:"manual"
+    }
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"json-sub-scan",
+      entries:[
+        {url: "#{LEFT_FILE}"}
+      ]
+    },
+    {
+      @id:2,
+      pop:"json-sub-scan",
+      entries:[
+        {url: "#{RIGHT_FILE}"}
+      ]
+    },
+    {
+      @id: 3,
+      left: 1,
+      right: 2,
+      pop: "merge-join",
+      join-conditions: [ { relationship: "==", left: "a", right: "aa" } ]
+    },
+    {
+      @id: 4,
+      child: 3,
+      pop: "screen"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.left.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.left.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.left.json
new file mode 100644
index 0000000..e7bab08
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.left.json
@@ -0,0 +1,13 @@
+{"a":1, "b":100}
+{"a":1, "b":200}
+{"a":1, "b":300}
+{"a":2, "b":400}
+{"a":2, "b":500}
+{"a":2, "b":600}
+{"a":3, "b":700}
+{"a":4, "b":800}
+{"a":5, "b":900}
+{"a":6, "b":1000}
+{"a":7, "b":1100}
+{"a":8, "b":1200}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0bac2f0/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.right.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.right.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.right.json
new file mode 100644
index 0000000..d99a145
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.right.json
@@ -0,0 +1,11 @@
+{"bb":10000, "aa":1}
+{"bb":20000, "aa":2}
+{"bb":30000, "aa":2}
+{"bb":40000, "aa":2}
+{"bb":50000, "aa":2}
+{"bb":60000, "aa":2}
+{"bb":70000, "aa":3}
+{"bb":80000, "aa":4}
+{"bb":90000, "aa":6}
+{"bb":100000, "aa":7}
+{"bb":110000, "aa":7}


[2/7] git commit: DRILL-190 (part1) First pass at join operator (includes work from JN).

Posted by ja...@apache.org.
DRILL-190 (part1) First pass at join operator (includes work from JN).


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8ceee5db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8ceee5db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8ceee5db

Branch: refs/heads/master
Commit: 8ceee5dbd23c4039b3738b3de8d51039aae8e910
Parents: dddae74
Author: Ben Becker <be...@gmail.com>
Authored: Thu Aug 15 21:16:27 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Aug 28 20:35:05 2013 -0700

----------------------------------------------------------------------
 .../physical/base/AbstractPhysicalVisitor.java  |  19 ++-
 .../exec/physical/base/PhysicalVisitor.java     |  15 +-
 .../exec/physical/config/MergeJoinPOP.java      |  64 ++++++++
 .../exec/physical/impl/join/JoinEvaluator.java  |   9 ++
 .../exec/physical/impl/join/JoinStatus.java     | 154 ++++++++++++++++++
 .../exec/physical/impl/join/JoinTemplate.java   | 155 +++++++++++++++++++
 .../exec/physical/impl/join/JoinWorker.java     |  20 +++
 .../exec/physical/impl/join/MergeJoinBatch.java | 135 ++++++++++++++++
 .../impl/join/MergeJoinBatchBuilder.java        | 128 +++++++++++++++
 9 files changed, 695 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index cf6cb47..ad41452 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -17,7 +17,17 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.base;
 
-import org.apache.drill.exec.physical.config.*;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.RangeSender;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.UnionExchange;
 
 public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class);
@@ -28,7 +38,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
   }
 
   @Override
-  public T visitUnion(Union union, X value) throws E {
+  public T visitUnion(UnionExchange union, X value) throws E {
     return visitOp(union, value);
   }
 
@@ -87,6 +97,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
   }
   
   @Override
+  public T visitMergeJoin(MergeJoinPOP join, X value) throws E {
+    return visitOp(join, value);
+  }
+
+  @Override
   public T visitHashPartitionSender(HashPartitionSender op, X value) throws E {
     return visitSender(op, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 9f76693..5f50422 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -17,7 +17,17 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.base;
 
-import org.apache.drill.exec.physical.config.*;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.RangeSender;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.UnionExchange;
 
 /**
  * Visitor class designed to traversal of a operator tree.  Basis for a number of operator manipulations including fragmentation and materialization.
@@ -35,9 +45,10 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
   public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
 
   public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
-  public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
+  public RETURN visitUnion(UnionExchange union, EXTRA value) throws EXCEP;
   public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
   public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
+  public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;
   public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP;
   public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP;
   public RETURN visitStreamingAggregate(StreamingAggregate agg, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
new file mode 100644
index 0000000..05fee19
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
@@ -0,0 +1,64 @@
+package org.apache.drill.exec.physical.config;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+@JsonTypeName("merge-join")
+public class MergeJoinPOP extends AbstractBase{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinPOP.class);
+
+  
+  private PhysicalOperator left;
+  private PhysicalOperator right;
+  private List<JoinCondition> conditions;
+  
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(0,0,0,0);
+  }
+
+  @JsonCreator
+  public MergeJoinPOP(
+      @JsonProperty("left") PhysicalOperator left, 
+      @JsonProperty("right") PhysicalOperator right,
+      @JsonProperty("join-conditions") List<JoinCondition> conditions 
+      ) {
+    this.left = left;
+    this.right = right;
+    this.conditions = conditions;
+  }
+  
+  @Override
+  public Size getSize() {
+    return left.getSize().add(right.getSize());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitMergeJoin(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.size() == 2);
+    return new MergeJoinPOP(children.get(0), children.get(1), conditions);
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.forArray(left, right);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
new file mode 100644
index 0000000..42ca604
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
@@ -0,0 +1,9 @@
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface JoinEvaluator {
+  public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch outgoing);
+  public abstract boolean copy(int leftPosition, int rightPosition, int outputPosition);
+  public abstract int compare(int leftPosition, int rightPosition);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
new file mode 100644
index 0000000..8831006
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -0,0 +1,154 @@
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+/**
+ * The status of the current join.  Maintained outside the individually compiled join templates so that we can carry status across multiple schemas.
+ */
+public final class JoinStatus {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class);
+
+  public static enum RightSourceMode {
+    INCOMING_BATCHES, QUEUED_BATCHES;
+  }
+
+  public int leftPosition;
+  private final RecordBatch left;
+  private IterOutcome lastLeft;
+
+  public int rightPosition;
+  public int svRightPosition;
+  private final RecordBatch right;
+  private IterOutcome lastRight;
+  
+  public int outputPosition;
+  public RightSourceMode rightSourceMode = RightSourceMode.INCOMING_BATCHES;
+  public MergeJoinBatch outputBatch;
+  public SelectionVector4 sv4;
+
+  private boolean initialSet = false;
+  private boolean leftRepeating = false;
+  
+  public JoinStatus(RecordBatch left, RecordBatch right, MergeJoinBatch output) {
+    super();
+    this.left = left;
+    this.right = right;
+    this.outputBatch = output;
+  }
+
+  public final void ensureInitial(){
+    if(!initialSet){
+      this.lastLeft = left.next();
+      this.lastRight = right.next();
+      initialSet = true;
+    }
+  }
+  
+  public final void advanceLeft(){
+    leftPosition++;
+  }
+
+  public final void advanceRight(){
+    if (rightSourceMode == RightSourceMode.INCOMING_BATCHES)
+      rightPosition++;
+    else {
+      // advance through queued batches
+    }
+  }
+
+  public final int getLeftPosition() {
+    return leftPosition;
+  }
+
+  public final int getRightPosition() {
+    return (rightSourceMode == RightSourceMode.INCOMING_BATCHES) ? rightPosition : svRightPosition;
+  }
+
+  public final void notifyLeftRepeating() {
+    leftRepeating = true;
+    outputBatch.resetBatchBuilder();
+  }
+
+  public final void notifyLeftStoppedRepeating() {
+    leftRepeating = false;
+  }
+
+  public final boolean isLeftRepeating() {
+    return leftRepeating;
+  }
+
+  public void setDefaultAdvanceMode() {
+    rightSourceMode = RightSourceMode.INCOMING_BATCHES;
+    rightPosition = 0;
+  }
+
+  public void setRepeatedAdvanceMode() {
+    rightSourceMode = RightSourceMode.QUEUED_BATCHES;
+    svRightPosition = 0;
+  }
+
+  /**
+   * Check if the left record position can advance by one.
+   * Side effect: advances to next left batch if current left batch size is exceeded.
+   */
+  public final boolean isLeftPositionAllowed(){
+    if(!isNextLeftPositionInCurrentBatch()){
+      leftPosition = 0;
+      lastLeft = left.next();
+      return lastLeft == IterOutcome.OK;
+    }else{
+      lastLeft = IterOutcome.OK;
+      return true;
+    }
+  }
+
+  /**
+   * Check if the right record position can advance by one.
+   * Side effect: advances to next right batch if current right batch size is exceeded
+   */
+  public final boolean isRightPositionAllowed(){
+    if(isNextRightPositionInCurrentBatch()){
+      rightPosition = 0;
+      lastRight = right.next();
+      return lastRight == IterOutcome.OK;
+    }else{
+      lastRight = IterOutcome.OK;
+      return true;
+    }
+    
+  }
+
+  /**
+   * Check if the left record position can advance by one in the current batch.
+   */
+  public final boolean isNextLeftPositionInCurrentBatch() {
+    return leftPosition < left.getRecordCount();
+  }
+
+  /**
+   * Check if the left record position can advance by one in the current batch.
+   */
+  public final boolean isNextRightPositionInCurrentBatch() {
+    return rightPosition < right.getRecordCount();
+  }
+
+  public JoinOutcome getOutcome(){
+    if (lastLeft == IterOutcome.OK && lastRight == IterOutcome.OK)
+      return JoinOutcome.BATCH_RETURNED;
+    if (eitherMatches(IterOutcome.OK_NEW_SCHEMA))
+      return JoinOutcome.SCHEMA_CHANGED;
+    if (eitherMatches(IterOutcome.NOT_YET))
+      return JoinOutcome.WAITING;
+    if (eitherMatches(IterOutcome.NONE))
+      return JoinOutcome.NO_MORE_DATA;
+    return JoinOutcome.FAILURE;
+  }
+  
+  private boolean eitherMatches(IterOutcome outcome){
+    return lastLeft == outcome || lastRight == outcome;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
new file mode 100644
index 0000000..51cc5e5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -0,0 +1,155 @@
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+
+/**
+ * This join template uses a merge join to combine two ordered streams into a single larger batch.  When joining
+ * single values on each side, the values can be copied to the outgoing batch immediately.  The outgoing record batch
+ * should be sent as needed (e.g. schema change or outgoing batch full).  When joining multiple values on one or
+ * both sides, two passes over the vectors will be made; one to construct the selection vector, and another to
+ * generate the outgoing batches once the duplicate value is no longer encountered.
+ *
+ * Given two tables ordered by 'col1':
+ *
+ *        t1                t2
+ *  ---------------   ---------------
+ *  | key | col2 |    | key | col2 |
+ *  ---------------   ---------------
+ *  |  1  | 'ab' |    |  1  | 'AB' |
+ *  |  2  | 'cd' |    |  2  | 'CD' |
+ *  |  2  | 'ef' |    |  4  | 'EF' |
+ *  |  4  | 'gh' |    |  4  | 'GH' |
+ *  |  4  | 'ij' |    |  5  | 'IJ' |
+ *  ---------------   ---------------
+ *
+ * 'SELECT * FROM t1 INNER JOIN t2 on (t1.key == t2.key)' should generate the following:
+ *
+ * ---------------------------------
+ * | t1.key | t2.key | col1 | col2 |
+ * ---------------------------------
+ * |   1    |   1    | 'ab' | 'AB' |
+ * |   2    |   2    | 'cd' | 'CD' |
+ * |   2    |   2    | 'ef' | 'CD' |
+ * |   4    |   4    | 'gh' | 'EF' |
+ * |   4    |   4    | 'gh' | 'GH' |
+ * |   4    |   4    | 'ij' | 'EF' |
+ * |   4    |   4    | 'ij' | 'GH' |
+ * ---------------------------------
+ *
+ * In the simple match case, only one row from each table matches.  Additional cases should be considered:
+ *   - a left join key matches multiple right join keys
+ *   - duplicate keys which may span multiple record batches (on the left and/or right side)
+ *   - one or both incoming record batches change schemas
+ *
+ * In the case where a left join key matches multiple right join keys:
+ *   - add a reference to all of the right table's matching values to the SV4.
+ *
+ * A RecordBatchData object should be used to hold onto all batches which have not been sent.
+ *
+ * JoinStatus:
+ *   - all state related to the join operation is stored in the JoinStatus object.
+ *   - this is required since code may be regenerated before completion of an outgoing record batch.
+ */
+public abstract class JoinTemplate implements JoinWorker {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinTemplate.class);
+
+  @Override
+  public void setupJoin(JoinStatus status, VectorContainer outgoing){
+    
+  }
+
+  /**
+   * Copy rows from the input record batches until the output record batch is full
+   * @param status  State of the join operation (persists across multiple record batches/schema changes)
+   */
+  public final void doJoin(final JoinStatus status) {
+    while (true) {
+      // for each record
+
+      // validate position and advance to the next record batch if necessary
+      if (!status.isLeftPositionAllowed()) return;
+      if (!status.isRightPositionAllowed()) return;
+
+      int comparison = compare(status.leftPosition, status.rightPosition);
+      switch (comparison) {
+
+      case -1:
+        // left key < right key
+        status.advanceLeft();
+        continue;
+
+      case 0:
+        // left key == right key
+        if (!status.isLeftRepeating() &&
+            status.isNextLeftPositionInCurrentBatch() &&
+            compareNextLeftKey(status.leftPosition) == 0) {
+          // records in the left batch contain duplicate keys
+          // TODO: leftHasDups = true, if next left key matches but is in a new batch
+          status.notifyLeftRepeating();
+        }
+        
+        do {
+          // copy all equal right keys to the output record batch
+          if (!copy(status.leftPosition, status.rightPosition, status.outputPosition++))
+            return;
+
+          // If the left key has duplicates and we're about to cross batch boundaries, queue the
+          // right table's record batch before calling next.  These records will need to be copied
+          // again for each duplicate left key.
+          if (status.isLeftRepeating() && !status.isNextRightPositionInCurrentBatch()) {
+            // last record in right batch is a duplicate, and at the end of the batch
+            status.outputBatch.addRightToBatchBuilder();
+          }
+          status.advanceRight();
+        } while (status.isRightPositionAllowed() && compare(status.leftPosition, status.rightPosition) == 0);
+
+        status.advanceLeft();
+
+        if (status.isLeftRepeating() && compareNextLeftKey(status.leftPosition) != 0) {
+          // left no longer has duplicates.  switch back to incoming batch mode
+          status.setDefaultAdvanceMode();
+          status.notifyLeftStoppedRepeating();
+        } else if (status.isLeftRepeating()) {
+          // left is going to repeat; use sv4 for right batch
+          status.setRepeatedAdvanceMode();
+        }          
+
+        continue;
+
+      case 1:
+        // left key > right key
+        status.advanceRight();
+        continue;
+
+      default:
+        throw new IllegalStateException();
+      }
+    }
+  }
+
+  
+  /**
+   * Copy the data to the new record batch (if it fits).
+   *
+   * @param leftPosition  position of batch (lower 16 bits) and record (upper 16 bits) in left SV4
+   * @param rightPosition position of batch (lower 16 bits) and record (upper 16 bits) in right SV4
+   * @param outputPosition position of the output record batch
+   * @return Whether or not the data was copied.
+   */
+  protected abstract boolean copy(int leftPosition, int rightPosition, int outputPosition);
+  
+  /**
+   * Compare the values of the left and right join key to determine whether the left is less than, greater than
+   * or equal to the right.
+   *
+   * @param leftPosition
+   * @param rightPosition
+   * @return  0 if both keys are equal
+   *         -1 if left is < right
+   *          1 if left is > right
+   */
+  protected abstract int compare(int leftPosition, int rightPosition);
+  protected abstract int compareNextLeftKey(int position);
+  public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch outgoing);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
new file mode 100644
index 0000000..54d2076
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
@@ -0,0 +1,20 @@
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.record.VectorContainer;
+
+
+public interface JoinWorker {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinWorker.class);
+  
+  public static enum JoinOutcome {
+    NO_MORE_DATA, BATCH_RETURNED, MODE_CHANGED, SCHEMA_CHANGED, WAITING, FAILURE;
+  }
+  
+  public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<JoinWorker>( //
+      JoinWorker.class, "org.apache.drill.exec.physical.impl.mergejoin.JoinTemplate", JoinEvaluator.class, null);
+
+  
+  public void setupJoin(JoinStatus status, VectorContainer outgoing);
+  public void doJoin(JoinStatus status);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
new file mode 100644
index 0000000..4d633bb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -0,0 +1,135 @@
+package org.apache.drill.exec.physical.impl.join;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * A merge join combining to incoming in-order batches.
+ */
+public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
+  
+  private final RecordBatch left;
+  private final RecordBatch right;
+  private final JoinStatus status;
+  private JoinWorker worker;
+  public MergeJoinBatchBuilder batchBuilder;
+  
+  protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) {
+    super(popConfig, context);
+    this.left = left;
+    this.right = right;
+    this.status = new JoinStatus(left, right, this);
+    this.batchBuilder = new MergeJoinBatchBuilder(context, status);
+  }
+
+  @Override
+  public int getRecordCount() {
+    return status.outputPosition;
+  }
+
+  @Override
+  public IterOutcome next() {
+    
+    // we do this in the here instead of the constructor because don't necessary want to start consuming on construction.
+    status.ensureInitial();
+    
+    // loop so we can start over again if we find a new batch was created.
+    while(true){
+      
+      boolean first = false;
+      if(worker == null){
+        try {
+          this.worker = getNewWorker();
+          first = true;
+        } catch (ClassTransformationException | IOException e) {
+          context.fail(new SchemaChangeException(e));
+          kill();
+          return IterOutcome.STOP;
+        }
+      }
+
+      // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch.
+      if(status.getOutcome() == JoinOutcome.BATCH_RETURNED || status.getOutcome() == JoinOutcome.SCHEMA_CHANGED){
+        allocateBatch();
+      }
+
+      // join until we have a complete outgoing batch
+      worker.doJoin(status);
+
+      // get the outcome of the join.
+      switch(status.getOutcome()){
+      case BATCH_RETURNED:
+        // only return new schema if new worker has been setup.
+        return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
+      case FAILURE:
+        kill();
+        return IterOutcome.STOP;
+      case NO_MORE_DATA:
+        return status.outputPosition > 0 ? IterOutcome.OK: IterOutcome.NONE;
+      case MODE_CHANGED:
+      case SCHEMA_CHANGED:
+        worker = null;
+        if(status.outputPosition > 0){
+          // if we have current data, let's return that.
+          return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
+        }else{
+          // loop again to rebuild worker.
+          continue;
+        }
+      case WAITING:
+        return IterOutcome.NOT_YET;
+      default:
+        throw new IllegalStateException();
+      }
+    }
+  }
+
+  public void resetBatchBuilder() {
+    batchBuilder = new MergeJoinBatchBuilder(context, status);
+  }
+
+  public void addRightToBatchBuilder() {
+    batchBuilder.add(right);
+  }
+
+  @Override
+  protected void killIncoming() {
+    left.kill();
+    right.kill();
+  }
+
+  private JoinWorker getNewWorker() throws ClassTransformationException, IOException{
+    CodeGenerator<JoinWorker> cg = new CodeGenerator<JoinWorker>(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+
+    // if (status.rightSourceMode)
+      // generate copier which deref's SV4
+    // else
+      // generate direct copier.
+    
+    // generate comparator.
+    // generate compareNextLeftKey.
+
+    JoinWorker w = context.getImplementationClass(cg);
+    w.setupJoin(status, this.container);
+    return w;
+  }
+
+  private void allocateBatch(){
+    // allocate new batch space.
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ceee5db/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
new file mode 100644
index 0000000..d75cfb9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
@@ -0,0 +1,128 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.List;
+
+public class MergeJoinBatchBuilder {
+
+  private final ArrayListMultimap<BatchSchema, RecordBatchData> queuedRightBatches = ArrayListMultimap.create();
+  private VectorContainer container;
+  private int runningBytes;
+  private int runningBatches;
+  private int recordCount;
+  private PreAllocator svAllocator;
+  private JoinStatus status;
+
+  public MergeJoinBatchBuilder(FragmentContext context, JoinStatus status) {
+    this.status = status;
+    this.svAllocator = context.getAllocator().getPreAllocator();
+  }
+
+  public boolean add(RecordBatch batch) {
+    if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE)
+      throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
+    if (batch.getRecordCount() == 0) return true; // skip over empty record batches.
+
+    // resource checks
+    long batchBytes = getSize(batch);
+    if (batchBytes + runningBytes > Integer.MAX_VALUE) return false;      // TODO: 2GB is arbitrary
+    if (runningBatches + 1 > Character.MAX_VALUE) return false;           // allowed in batch.
+    if (!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
+
+    // transfer VVs to a new RecordBatchData
+    RecordBatchData bd = new RecordBatchData(batch);
+    runningBytes += batchBytes;
+    queuedRightBatches.put(batch.getSchema(), bd);
+    recordCount += bd.getRecordCount();
+    return true;
+  }
+
+  private long getSize(RecordBatch batch){
+    long bytes = 0;
+    for(VectorWrapper<?> v : batch){
+      bytes += v.getValueVector().getBufferSize();
+    }
+    return bytes;
+  }
+
+  public void build() throws SchemaChangeException {
+    container.clear();
+//    if (queuedRightBatches.keySet().size() > 1) throw new SchemaChangeException("Join currently only supports a single schema.");
+    if (queuedRightBatches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+    status.sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
+    BatchSchema schema = queuedRightBatches.keySet().iterator().next();
+    List<RecordBatchData> data = queuedRightBatches.get(schema);
+
+    // now we're going to generate the sv4 pointers
+    switch(schema.getSelectionVectorMode()){
+      case NONE: {
+        int index = 0;
+        int recordBatchId = 0;
+        for(RecordBatchData d : data){
+          for(int i =0; i < d.getRecordCount(); i++, index++){
+            status.sv4.set(index, recordBatchId, i);
+          }
+          recordBatchId++;
+        }
+        break;
+      }
+      case TWO_BYTE: {
+        int index = 0;
+        int recordBatchId = 0;
+        for(RecordBatchData d : data){
+          for(int i =0; i < d.getRecordCount(); i++, index++){
+            status.sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
+          }
+          // might as well drop the selection vector since we'll stop using it now.
+          d.getSv2().clear();
+          recordBatchId++;
+        }
+        break;
+      }
+      default:
+        throw new UnsupportedOperationException();
+    }
+
+    // next, we'll create lists of each of the vector types.
+    ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create();
+    for (RecordBatchData rbd : queuedRightBatches.values()) {
+      for (ValueVector v : rbd.getVectors()) {
+        vectors.put(v.getField(), v);
+      }
+    }
+
+    for(MaterializedField f : vectors.keySet()){
+      List<ValueVector> v = vectors.get(f);
+      container.addHyperList(v);
+    }
+
+    container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+  }
+
+}


[5/7] git commit: DRILL-121 RPM and DEB packages

Posted by ja...@apache.org.
DRILL-121 RPM and DEB packages


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2bcf0547
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2bcf0547
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2bcf0547

Branch: refs/heads/master
Commit: 2bcf0547a0d230e65d8a935c634e01608c19eb5d
Parents: 5232b0e
Author: Steven Phillips <sp...@maprtech.com>
Authored: Wed Aug 21 19:23:28 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Aug 28 20:37:53 2013 -0700

----------------------------------------------------------------------
 sandbox/prototype/distribution/pom.xml          | 214 +++++++++++++++++
 .../prototype/distribution/src/assemble/bin.xml |  81 +++++++
 .../distribution/src/deb/control/conffiles      |   3 +
 .../distribution/src/deb/control/control        |   8 +
 .../distribution/src/resources/drill-config.sh  |  96 ++++++++
 .../distribution/src/resources/drill-env.sh     |   2 +
 .../src/resources/drill-override.conf           |  36 +++
 .../distribution/src/resources/drillbit         | 166 +++++++++++++
 .../distribution/src/resources/drillbit.sh      | 230 +++++++++++++++++++
 .../distribution/src/resources/logback.xml      |  36 +++
 .../prototype/distribution/src/resources/runbit |  30 +++
 .../distribution/src/resources/sqlline          |  30 +++
 .../src/main/resources/drill-module.conf        |   2 +-
 .../java-exec/src/main/resources/logback.xml    |  49 ++++
 .../exec/java-exec/src/main/sh/drill-config.sh  |  96 ++++++++
 .../exec/java-exec/src/main/sh/drillbit.sh      | 230 +++++++++++++++++++
 .../prototype/exec/java-exec/src/main/sh/runbit |  30 +++
 sandbox/prototype/pom.xml                       |  10 +
 18 files changed, 1348 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/distribution/pom.xml b/sandbox/prototype/distribution/pom.xml
new file mode 100644
index 0000000..934c81a
--- /dev/null
+++ b/sandbox/prototype/distribution/pom.xml
@@ -0,0 +1,214 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>prototype-parent</artifactId>
+    <groupId>org.apache.drill</groupId>
+    <version>1.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.drill</groupId>
+  <artifactId>distribution</artifactId>
+  <packaging>pom</packaging>
+  <name>Packaging</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>java-exec</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+      <dependency>
+          <groupId>org.apache.drill.exec</groupId>
+          <artifactId>netty-bufferl</artifactId>
+          <version>4.0.7.Final</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.drill.exec</groupId>
+          <artifactId>ref</artifactId>
+          <version>1.0-SNAPSHOT</version>
+      </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>common</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>planner</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>sqlparser</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>distro-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <descriptors>
+                <descriptor>src/assemble/bin.xml</descriptor>
+              </descriptors>
+              <finalName>drill-1.0</finalName>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>rpm</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>rpm-maven-plugin</artifactId>
+            <version>2.1-alpha-3</version>
+            <executions>
+              <execution>
+                <id>generate-rpm</id>
+                <goals>
+                  <goal>rpm</goal>
+                </goals>
+              </execution>
+            </executions>
+            <configuration>
+              <copyright>2013 ASF</copyright>
+              <group>Apache Software Foundation</group>
+              <prefix>/opt</prefix>
+              <release>SNAPSHOT</release>
+              <version>1.0</version>
+              <name>drill</name>
+              <mappings>
+                <mapping>
+                  <directory>/opt/drill/bin</directory>
+                  <sources>
+                    <source>
+                      <location>target/drill-1.0-bin/drill-1.0/bin</location>
+                    </source>
+                  </sources>
+                </mapping>
+                <mapping>
+                  <directory>/opt/drill/lib</directory>
+                  <sources>
+                    <source>
+                      <location>target/drill-1.0-bin/drill-1.0/lib</location>
+                    </source>
+                  </sources>
+                </mapping>
+                <mapping>
+                  <directory>/opt/drill/jars</directory>
+                  <sources>
+                    <source>
+                      <location>target/drill-1.0-bin/drill-1.0/jars</location>
+                    </source>
+                  </sources>
+                </mapping>
+                <mapping>
+                  <directory>/etc/drill/conf</directory>
+                  <sources>
+                    <source>
+                      <location>target/drill-1.0-bin/drill-1.0/conf</location>
+                    </source>
+                  </sources>
+                  <configuration>true</configuration>
+                </mapping>
+                <mapping>
+                  <directory>/etc/init.d/</directory>
+                  <sources>
+                    <source>
+                      <location>src/resources/drillbit</location>
+                    </source>
+                  </sources>
+                  <directoryIncluded>false</directoryIncluded>
+                </mapping>
+              </mappings>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>deb</id>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>jdeb</artifactId>
+            <groupId>org.vafer</groupId>
+            <version>1.0</version>
+            <executions>
+              <execution>
+                <phase>package</phase>
+                <goals>
+                  <goal>jdeb</goal>
+                </goals>
+                <configuration>
+                  <deb>target/drill-1.0-SNAPSHOT.deb</deb>
+                  <dataSet>
+                    <data>
+                      <src>target/drill-1.0-bin/drill-1.0/lib</src>
+                      <type>directory</type>
+                      <mapper>
+                        <type>perm</type>
+                        <prefix>/opt/drill/lib/</prefix>
+                      </mapper>
+                    </data>
+                    <data>
+                      <src>target/drill-1.0-bin/drill-1.0/jars</src>
+                      <type>directory</type>
+                      <mapper>
+                        <type>perm</type>
+                        <prefix>/opt/drill/jars/</prefix>
+                      </mapper>
+                    </data>
+                    <data>
+                      <src>target/drill-1.0-bin/drill-1.0/bin</src>
+                      <type>directory</type>
+                      <mapper>
+                        <type>perm</type>
+                        <prefix>/opt/drill/bin/</prefix>
+                        <filemode>755</filemode>
+                      </mapper>
+                    </data>
+                    <data>
+                      <src>target/drill-1.0-bin/drill-1.0/conf</src>
+                      <type>directory</type>
+                      <mapper>
+                        <type>perm</type>
+                        <prefix>/etc/drill/conf</prefix>
+                        <filemode>755</filemode>
+                      </mapper>
+                    </data>
+                    <data>
+                      <src>src/resources/drillbit</src>
+                      <dst>/etc/init.d/drillbit</dst>
+                      <type>file</type>
+                      <mapper>
+                        <type>perm</type>
+                        <filemode>755</filemode>
+                      </mapper>
+                    </data>
+                  </dataSet>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/distribution/src/assemble/bin.xml b/sandbox/prototype/distribution/src/assemble/bin.xml
new file mode 100644
index 0000000..5276c69
--- /dev/null
+++ b/sandbox/prototype/distribution/src/assemble/bin.xml
@@ -0,0 +1,81 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>bin</id>
+  <formats>
+    <format>tar.gz</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>true</includeBaseDirectory>
+  <moduleSets>
+    <moduleSet>
+    
+      <!-- Enable access to all projects in the current multimodule build! -->
+      <useAllReactorProjects>true</useAllReactorProjects>
+      
+      <!-- Now, select which projects to include in this module-set. -->
+      <includes>
+        <include>org.apache.drill:planner:jar</include>
+        <include>org.apache.drill:sqlparser:jar</include>
+        <include>org.apache.drill.exec:netty-bufferl</include>
+        <include>org.apache.drill.exec:ref</include>
+      </includes>
+      <binaries>
+        <outputDirectory>jars</outputDirectory>
+        <unpack>false</unpack>
+      </binaries>
+    </moduleSet>
+  </moduleSets>
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory>lib</outputDirectory>
+      <unpack>false</unpack>
+      <useProjectArtifact>false</useProjectArtifact>
+      <excludes>
+        <exclude>org.apache.drill</exclude>
+        <exclude>org.apache.drill.exec</exclude>
+        <exclude>com.google.protobuf</exclude>
+        <exclude>de.huxhorn.*</exclude>
+      </excludes>
+      <scope>test</scope>
+    </dependencySet>
+  </dependencySets>
+  <files>
+    <file>
+      <source>../common/target/common-1.0-SNAPSHOT-rebuffed.jar</source>
+      <outputDirectory>jars</outputDirectory>
+    </file>
+    <file>
+      <source>../exec/java-exec/target/java-exec-1.0-SNAPSHOT-rebuffed.jar</source>
+      <outputDirectory>jars</outputDirectory>
+    </file>
+    <file>
+      <source>src/resources/runbit</source>
+      <outputDirectory>bin</outputDirectory>
+    </file>
+    <file>
+      <source>src/resources/drillbit.sh</source>
+      <outputDirectory>bin</outputDirectory>
+    </file>
+    <file>
+      <source>src/resources/drill-config.sh</source>
+      <outputDirectory>bin</outputDirectory>
+    </file>
+    <file>
+      <source>src/resources/sqlline</source>
+      <outputDirectory>bin</outputDirectory>
+    </file>
+    <file>
+      <source>src/resources/drill-override.conf</source>
+      <outputDirectory>conf</outputDirectory>
+    </file>
+    <file>
+      <source>src/resources/logback.xml</source>
+      <outputDirectory>conf</outputDirectory>
+    </file>
+    <file>
+      <source>src/resources/drill-env.sh</source>
+      <outputDirectory>conf</outputDirectory>
+    </file>
+  </files>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/distribution/src/deb/control/conffiles
----------------------------------------------------------------------
diff --git a/sandbox/prototype/distribution/src/deb/control/conffiles b/sandbox/prototype/distribution/src/deb/control/conffiles
new file mode 100644
index 0000000..a9d1fda
--- /dev/null
+++ b/sandbox/prototype/distribution/src/deb/control/conffiles
@@ -0,0 +1,3 @@
+/etc/drill/conf/drill-override.conf
+/etc/drill/conf/logback.xml
+/etc/drill/conf/drill-env.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/distribution/src/deb/control/control
----------------------------------------------------------------------
diff --git a/sandbox/prototype/distribution/src/deb/control/control b/sandbox/prototype/distribution/src/deb/control/control
new file mode 100644
index 0000000..8a6b69b
--- /dev/null
+++ b/sandbox/prototype/distribution/src/deb/control/control
@@ -0,0 +1,8 @@
+Package: drill
+Version: 1.0-SNAPSHOT
+Section: misc
+Priority: optional
+Architecture: all
+Maintainer: name <>
+Description: Apache Drill
+Distribution: development
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/distribution/src/resources/drill-config.sh
----------------------------------------------------------------------
diff --git a/sandbox/prototype/distribution/src/resources/drill-config.sh b/sandbox/prototype/distribution/src/resources/drill-config.sh
new file mode 100644
index 0000000..20102fc
--- /dev/null
+++ b/sandbox/prototype/distribution/src/resources/drill-config.sh
@@ -0,0 +1,96 @@
+#
+#/**
+# * Copyright 2013 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+# included in all the drill scripts with source command
+# should not be executable directly
+# also should not be passed any arguments, since we need original $*
+# Modelled after $HADOOP_HOME/bin/hadoop-env.sh.
+
+# resolve links - "${BASH_SOURCE-$0}" may be a softlink
+
+this="${BASH_SOURCE-$0}"
+while [ -h "$this" ]; do
+  ls=`ls -ld "$this"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '.*/.*' > /dev/null; then
+    this="$link"
+  else
+    this=`dirname "$this"`/"$link"
+  fi
+done
+
+# convert relative path to absolute path
+bin=`dirname "$this"`
+script=`basename "$this"`
+bin=`cd "$bin">/dev/null; pwd`
+this="$bin/$script"
+
+# the root of the drill installation
+if [ -z "$DRILL_HOME" ]; then
+  export DRILL_HOME=`dirname "$this"`/..
+fi
+
+#check to see if the conf dir or drill home are given as an optional arguments
+while [ $# -gt 1 ]
+do
+  if [ "--config" = "$1" ]
+  then
+    shift
+    confdir=$1
+    shift
+    DRILL_CONF_DIR=$confdir
+  else
+    # Presume we are at end of options and break
+    break
+  fi
+done
+ 
+# Allow alternate drill conf dir location.
+export DRILL_CONF_DIR="${DRILL_CONF_DIR:-/etc/drill/conf}"
+
+. "${DRILL_CONF_DIR}/drill-env.sh"
+
+# Newer versions of glibc use an arena memory allocator that causes virtual
+# memory usage to explode. Tune the variable down to prevent vmem explosion.
+export MALLOC_ARENA_MAX=${MALLOC_ARENA_MAX:-4}
+
+if [ -z "$JAVA_HOME" ]; then
+  if [ -e `which java` ]; then
+    SOURCE=`which java`
+    while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink
+      DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
+      SOURCE="$(readlink "$SOURCE")"
+      [[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
+    done
+    JAVA_HOME="$( cd -P "$( dirname "$SOURCE" )" && cd .. && pwd )"
+  fi
+  # if we didn't set it
+  if [ -z "$JAVA_HOME" ]; then
+    cat 1>&2 <<EOF
++======================================================================+
+|      Error: JAVA_HOME is not set and Java could not be found         |
++----------------------------------------------------------------------+
+| Drill requires Java 1.7 or later.                                    |
++======================================================================+
+EOF
+    exit 1
+  fi
+fi

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/distribution/src/resources/drill-env.sh
----------------------------------------------------------------------
diff --git a/sandbox/prototype/distribution/src/resources/drill-env.sh b/sandbox/prototype/distribution/src/resources/drill-env.sh
new file mode 100644
index 0000000..8bfb66b
--- /dev/null
+++ b/sandbox/prototype/distribution/src/resources/drill-env.sh
@@ -0,0 +1,2 @@
+DRILL_MAX_DIRECT_MEMORY="8G"
+export DRILL_JAVA_OPTS="-XX:MaxDirectMemorySize=$DRILL_MAX_DIRECT_MEMORY"

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/distribution/src/resources/drill-override.conf b/sandbox/prototype/distribution/src/resources/drill-override.conf
new file mode 100644
index 0000000..30305af
--- /dev/null
+++ b/sandbox/prototype/distribution/src/resources/drill-override.conf
@@ -0,0 +1,36 @@
+//  This file tells Drill to consider this module when class path scanning.  
+//  This file can also include any supplementary configuration information.  
+//  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.exec: {
+  cluster-id: "drillbits1"
+  rpc: {
+  	user.port : 31010,
+  	bit.port : 32011
+  },
+  operator: {
+    packages += "org.apache.drill.exec.physical.config"
+  },
+  optimizer: {
+    implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
+  },
+  storage: {
+	packages += "org.apache.drill.exec.store"
+  }
+  metrics : {
+  	context: "drillbit"
+  },
+  zk: {
+	connect: "localhost:2181",
+	root: "/drill",
+	refresh: 500,
+	timeout: 5000,
+	retry: {
+	  count: 7200,
+	  delay: 500
+	}
+  }
+
+  network: {
+    start: 35000
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/distribution/src/resources/drillbit
----------------------------------------------------------------------
diff --git a/sandbox/prototype/distribution/src/resources/drillbit b/sandbox/prototype/distribution/src/resources/drillbit
new file mode 100755
index 0000000..f235ed3
--- /dev/null
+++ b/sandbox/prototype/distribution/src/resources/drillbit
@@ -0,0 +1,166 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Starts a Drillbit
+#
+# chkconfig: 345 85 15
+# description: Drillbit
+#
+### BEGIN INIT INFO
+# Provides:          drillbit
+# Short-Description: Drillbit
+# Default-Start:     3 4 5
+# Default-Stop:      0 1 2 6
+# Required-Start:    $syslog $remote_fs
+# Required-Stop:     $syslog $remote_fs
+# Should-Start:
+# Should-Stop:
+### END INIT INFO
+
+. /lib/lsb/init-functions
+RETVAL_SUCCESS=0
+
+STATUS_RUNNING=0
+STATUS_DEAD=1
+STATUS_DEAD_AND_LOCK=2
+STATUS_NOT_RUNNING=3
+STATUS_OTHER_ERROR=102
+
+
+ERROR_PROGRAM_NOT_INSTALLED=5
+ERROR_PROGRAM_NOT_CONFIGURED=6
+
+
+RETVAL=0
+SLEEP_TIME=5
+PROC_NAME="java"
+
+DAEMON="drillbit"
+DESC="Drillbit"
+EXEC_PATH="/opt/drill/bin/drillbit.sh"
+SVC_USER=""
+DAEMON_FLAGS=""
+CONF_DIR="/etc/drill/conf"
+PIDFILE="/opt/drill/drillbit.pid"
+LOCKDIR="/var/lock/subsys"
+LOCKFILE="$LOCKDIR/drillbit"
+WORKING_DIR="~/"
+
+install -d -m 0755 -o  -g  /var/run/drillbit 1>/dev/null 2>&1 || :
+[ -d "$LOCKDIR" ] || install -d -m 0755 $LOCKDIR 1>/dev/null 2>&1 || :
+
+start() {
+  [ -x $EXEC_PATH ] || exit $ERROR_PROGRAM_NOT_INSTALLED
+  [ -d $CONF_DIR ] || exit $ERROR_PROGRAM_NOT_CONFIGURED
+  log_success_msg "Starting ${DESC}: "
+
+  su -s /bin/bash $SVC_USER -c "cd $WORKING_DIR && $EXEC_PATH --config '$CONF_DIR' start $DAEMON_FLAGS"
+
+  # Some processes are slow to start
+  sleep $SLEEP_TIME
+  checkstatusofproc
+  RETVAL=$?
+
+  [ $RETVAL -eq $RETVAL_SUCCESS ] && touch $LOCKFILE
+  return $RETVAL
+}
+
+
+stop() {
+  log_success_msg "Stopping ${DESC}: "
+  start_daemon $EXEC_PATH --config "$CONF_DIR" stop $DAEMON_FLAGS
+  RETVAL=$?
+
+  [ $RETVAL -eq $RETVAL_SUCCESS ] && rm -f $LOCKFILE $PIDFILE
+}
+
+restart() {
+  stop
+  start
+}
+
+checkstatusofproc(){
+  pidofproc -p $PIDFILE $PROC_NAME > /dev/null
+}
+
+checkstatus(){
+  checkstatusofproc
+  status=$?
+
+  case "$status" in
+    $STATUS_RUNNING)
+      log_success_msg "${DESC} is running"
+      ;;
+    $STATUS_DEAD)
+      log_failure_msg "${DESC} is dead and pid file exists"
+      ;;
+    $STATUS_DEAD_AND_LOCK)
+      log_failure_msg "${DESC} is dead and lock file exists"
+      ;;
+    $STATUS_NOT_RUNNING)
+      log_failure_msg "${DESC} is not running"
+      ;;
+    *)
+      log_failure_msg "${DESC} status is unknown"
+      ;;
+  esac
+  return $status
+}
+
+condrestart(){
+  [ -e $LOCKFILE ] && restart || :
+}
+
+check_for_root() {
+  if [ $(id -ur) -ne 0 ]; then
+    echo 'Error: root user required'
+    echo
+    exit 1
+  fi
+}
+
+service() {
+  case "$1" in
+    start)
+      check_for_root
+      start
+      ;;
+    stop)
+      check_for_root
+      stop
+      ;;
+    status)
+      checkstatus
+      RETVAL=$?
+      ;;
+    restart)
+      check_for_root
+      restart
+      ;;
+    condrestart|try-restart)
+      check_for_root
+      condrestart
+      ;;
+    *)
+      echo $"Usage: $0 {start|stop|status|restart|try-restart|condrestart}"
+      exit 1
+  esac
+}
+
+service "$1"
+
+exit $RETVAL

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/distribution/src/resources/drillbit.sh
----------------------------------------------------------------------
diff --git a/sandbox/prototype/distribution/src/resources/drillbit.sh b/sandbox/prototype/distribution/src/resources/drillbit.sh
new file mode 100755
index 0000000..f5293a6
--- /dev/null
+++ b/sandbox/prototype/distribution/src/resources/drillbit.sh
@@ -0,0 +1,230 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Copyright 2013 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+# 
+# Environment Variables
+#
+#   DRILL_CONF_DIR   Alternate drill conf dir. Default is ${DRILL_HOME}/conf.
+#   DRILL_LOG_DIR    Where log files are stored.  PWD by default.
+#   DRILL_PID_DIR    The pid files are stored. /tmp by default.
+#   DRILL_IDENT_STRING   A string representing this instance of drillbit. $USER by default
+#   DRILL_NICENESS The scheduling priority for daemons. Defaults to 0.
+#   DRILL_STOP_TIMEOUT  Time, in seconds, after which we kill -9 the server if it has not stopped.
+#                        Default 1200 seconds.
+#
+# Modelled after $HADOOP_HOME/bin/hadoop-daemon.sh
+
+usage="Usage: drillbit.sh [--config <conf-dir>]\
+ (start|stop|restart|autorestart)"
+
+# if no args specified, show usage
+if [ $# -lt 1 ]; then
+  echo $usage
+  exit 1
+fi
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin">/dev/null; pwd`
+
+. "$bin"/drill-config.sh
+
+# get arguments
+startStop=$1
+shift
+
+command=drillbit
+shift
+
+waitForProcessEnd() {
+  pidKilled=$1
+  commandName=$2
+  processedAt=`date +%s`
+  while kill -0 $pidKilled > /dev/null 2>&1;
+   do
+     echo -n "."
+     sleep 1;
+     # if process persists more than $DRILL_STOP_TIMEOUT (default 1200 sec) no mercy
+     if [ $(( `date +%s` - $processedAt )) -gt ${DRILL_STOP_TIMEOUT:-1200} ]; then
+       break;
+     fi
+   done
+  # process still there : kill -9
+  if kill -0 $pidKilled > /dev/null 2>&1; then
+    echo -n force stopping $commandName with kill -9 $pidKilled
+    $JAVA_HOME/bin/jstack -l $pidKilled > "$logout" 2>&1
+    kill -9 $pidKilled > /dev/null 2>&1
+  fi
+  # Add a CR after we're done w/ dots.
+  echo
+}
+
+drill_rotate_log ()
+{
+    log=$1;
+    num=5;
+    if [ -n "$2" ]; then
+    num=$2
+    fi
+    if [ -f "$log" ]; then # rotate logs
+    while [ $num -gt 1 ]; do
+        prev=`expr $num - 1`
+        [ -f "$log.$prev" ] && mv -f "$log.$prev" "$log.$num"
+        num=$prev
+    done
+    mv -f "$log" "$log.$num";
+    fi
+}
+
+check_before_start(){
+    #ckeck if the process is not running
+    mkdir -p "$DRILL_PID_DIR"
+    if [ -f $pid ]; then
+      if kill -0 `cat $pid` > /dev/null 2>&1; then
+        echo $command running as process `cat $pid`.  Stop it first.
+        exit 1
+      fi
+    fi
+}
+
+wait_until_done ()
+{
+    p=$1
+    cnt=${DRILLBIT_TIMEOUT:-300}
+    origcnt=$cnt
+    while kill -0 $p > /dev/null 2>&1; do
+      if [ $cnt -gt 1 ]; then
+        cnt=`expr $cnt - 1`
+        sleep 1
+      else
+        echo "Process did not complete after $origcnt seconds, killing."
+        kill -9 $p
+        exit 1
+      fi
+    done
+    return 0
+}
+
+# get log directory
+if [ "$DRILL_LOG_DIR" = "" ]; then
+  export DRILL_LOG_DIR=/var/log/drill
+fi
+mkdir -p "$DRILL_LOG_DIR"
+
+if [ "$DRILL_PID_DIR" = "" ]; then
+  DRILL_PID_DIR=$DRILL_HOME
+fi
+
+# Some variables
+# Work out java location so can print version into log.
+if [ "$JAVA_HOME" != "" ]; then
+  #echo "run java in $JAVA_HOME"
+  JAVA_HOME=$JAVA_HOME
+fi
+if [ "$JAVA_HOME" = "" ]; then
+  echo "Error: JAVA_HOME is not set."
+  exit 1
+fi
+
+JAVA=$JAVA_HOME/bin/java
+export DRILL_LOG_PREFIX=drillbit
+export DRILL_LOGFILE=$DRILL_LOG_PREFIX.log
+export DRILL_OUTFILE=$DRILL_LOG_PREFIX.out
+loggc=$DRILL_LOG_DIR/$DRILL_LOG_PREFIX.gc
+loglog="${DRILL_LOG_DIR}/${DRILL_LOGFILE}"
+logout="${DRILL_LOG_DIR}/${DRILL_OUTFILE}"
+pid=$DRILL_PID_DIR/drillbit.pid
+
+DRILL_JAVA_OPTS="$DRILL_JAVA_OPTS -Dlog.path=$loglog"
+
+if [ -n "$SERVER_GC_OPTS" ]; then
+  export SERVER_GC_OPTS=${SERVER_GC_OPTS/"-Xloggc:<FILE-PATH>"/"-Xloggc:${loggc}"}
+fi
+if [ -n "$CLIENT_GC_OPTS" ]; then
+  export CLIENT_GC_OPTS=${CLIENT_GC_OPTS/"-Xloggc:<FILE-PATH>"/"-Xloggc:${loggc}"}
+fi
+
+# Set default scheduling priority
+if [ "$DRILL_NICENESS" = "" ]; then
+    export DRILL_NICENESS=0
+fi
+
+thiscmd=$0
+args=$@
+
+case $startStop in
+
+(start)
+    check_before_start
+    echo starting $command, logging to $logout
+    nohup $thiscmd internal_start $command $args < /dev/null >> ${logout} 2>&1  &
+    sleep 1;
+  ;;
+
+(internal_start)
+    drill_rotate_log $loggc
+    # Add to the command log file vital stats on our environment.
+    echo "`date` Starting $command on `hostname`" >> $loglog
+    echo "`ulimit -a`" >> $loglog 2>&1
+    nice -n $DRILL_NICENESS "$DRILL_HOME"/bin/runbit \
+        $command "$@" start >> "$logout" 2>&1 &
+    echo $! > $pid
+    wait
+  ;;
+
+(stop)
+    rm -f "$DRILL_START_FILE"
+    if [ -f $pid ]; then
+      pidToKill=`cat $pid`
+      # kill -0 == see if the PID exists
+      if kill -0 $pidToKill > /dev/null 2>&1; then
+        echo stopping $command
+        echo "`date` Terminating $command" pid $pidToKill>> $loglog
+        kill $pidToKill > /dev/null 2>&1
+        waitForProcessEnd $pidToKill $command
+        rm $pid
+      else
+        retval=$?
+        echo no $command to stop because kill -0 of pid $pidToKill failed with status $retval
+      fi
+    else
+      echo no $command to stop because no pid file $pid
+    fi
+  ;;
+
+(restart)
+    # stop the command
+    $thiscmd --config "${DRILL_CONF_DIR}" stop $command $args &
+    wait_until_done $!
+    # wait a user-specified sleep period
+    sp=${DRILL_RESTART_SLEEP:-3}
+    if [ $sp -gt 0 ]; then
+      sleep $sp
+    fi
+    # start the command
+    $thiscmd --config "${DRILL_CONF_DIR}" start $command $args &
+    wait_until_done $!
+  ;;
+
+(*)
+  echo $usage
+  exit 1
+  ;;
+esac

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/distribution/src/resources/logback.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/distribution/src/resources/logback.xml b/sandbox/prototype/distribution/src/resources/logback.xml
new file mode 100644
index 0000000..8f0d410
--- /dev/null
+++ b/sandbox/prototype/distribution/src/resources/logback.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<configuration>
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+
+    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+      <file>${log.path}</file>
+      <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+        <fileNamePattern>${log.path}.%i</fileNamePattern>
+        <minIndex>1</minIndex>
+        <maxIndex>10</maxIndex>
+      </rollingPolicy>
+
+      <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+        <maxFileSize>100MB</maxFileSize>
+      </triggeringPolicy>
+      <encoder>
+        <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+      </encoder>
+    </appender>
+
+	<logger name="org.apache.drill" additivity="false">
+		<level value="info" />
+    <appender-ref ref="FILE" />
+	</logger>
+
+	<root>
+		<level value="error" />
+		<appender-ref ref="STDOUT" />
+	</root>  
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/distribution/src/resources/runbit
----------------------------------------------------------------------
diff --git a/sandbox/prototype/distribution/src/resources/runbit b/sandbox/prototype/distribution/src/resources/runbit
new file mode 100755
index 0000000..4e1b599
--- /dev/null
+++ b/sandbox/prototype/distribution/src/resources/runbit
@@ -0,0 +1,30 @@
+#!/bin/bash
+
+
+if [ -z $JAVA_HOME ]
+then
+  JAVA=`which java`
+else
+  JAVA=$JAVA_HOME/bin/java
+fi
+
+if [ -e $JAVA ]; then
+  echo ""
+else
+  echo "Java not found."
+  exit 1
+fi
+
+$JAVA -version 2>&1 | grep "version" | egrep -e "1.7" > /dev/null
+if [ $? -ne 0 ]; then
+  echo "Java 1.7 is required to run Apache Drill."
+  exit 1
+fi
+
+
+CP=$DRILL_HOME/jars/*:$CP
+CP=$DRILL_HOME/lib/*:$CP
+
+CP=$CP:$DRILL_CONF_DIR
+
+exec $JAVA $DRILL_JAVA_OPTS -cp $CP org.apache.drill.exec.server.Drillbit

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/distribution/src/resources/sqlline
----------------------------------------------------------------------
diff --git a/sandbox/prototype/distribution/src/resources/sqlline b/sandbox/prototype/distribution/src/resources/sqlline
new file mode 100755
index 0000000..973da49
--- /dev/null
+++ b/sandbox/prototype/distribution/src/resources/sqlline
@@ -0,0 +1,30 @@
+#!/bin/bash
+
+
+if [ -z $JAVA_HOME ]
+then
+  JAVA=`which java`
+else
+  JAVA=$JAVA_HOME/bin/java
+fi
+
+if [ -e $JAVA ]; then
+  echo ""
+else
+  echo "Java not found."
+  exit 1
+fi
+
+$JAVA -version 2>&1 | grep "version" | egrep -e "1.7" > /dev/null
+if [ $? -ne 0 ]; then
+  echo "Java 1.7 is required to run Apache Drill."
+  exit 1
+fi
+
+
+CP=$DRILL_HOME/jars/*:$CP
+CP=$DRILL_HOME/lib/*:$CP
+
+CP=$CP:$DRILL_CONF_DIR
+
+exec $DRILL_JAVA_OPTS -cp $CP sqlline.SqlLine "$@"

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
index a590420..d9e1ef5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
+++ b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
@@ -16,7 +16,7 @@ drill.exec: {
   },
   
   zk: {
-	connect: "10.10.30.52:5181",
+	connect: "localhost:2181",
 	root: "/drill",
 	refresh: 500,
 	timeout: 5000,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/exec/java-exec/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/resources/logback.xml b/sandbox/prototype/exec/java-exec/src/main/resources/logback.xml
new file mode 100644
index 0000000..feb75b3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/resources/logback.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<configuration>
+	<appender name="SOCKET"
+		class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
+		<Compressing>true</Compressing>
+		<ReconnectionDelay>10000</ReconnectionDelay>
+		<IncludeCallerData>true</IncludeCallerData>
+		<RemoteHosts>localhost</RemoteHosts>
+	</appender>
+
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+
+    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+      <file>${log.path}</file>
+      <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+        <fileNamePattern>${log.path}.%i</fileNamePattern>
+        <minIndex>1</minIndex>
+        <maxIndex>10</maxIndex>
+      </rollingPolicy>
+
+      <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+        <maxFileSize>100MB</maxFileSize>
+      </triggeringPolicy>
+      <encoder>
+        <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+      </encoder>
+    </appender>
+
+	<logger name="org.apache.drill" additivity="false">
+		<level value="info" />
+    <appender-ref ref="FILE" />
+	</logger>
+
+  <logger name="org.apache.drill" additivity="false">
+    <level value="debug" />
+    <appender-ref ref="SOCKET" />
+  </logger>
+
+	<root>
+		<level value="error" />
+		<appender-ref ref="STDOUT" />
+	</root>  
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/exec/java-exec/src/main/sh/drill-config.sh
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/sh/drill-config.sh b/sandbox/prototype/exec/java-exec/src/main/sh/drill-config.sh
new file mode 100644
index 0000000..20102fc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/sh/drill-config.sh
@@ -0,0 +1,96 @@
+#
+#/**
+# * Copyright 2013 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+# included in all the drill scripts with source command
+# should not be executable directly
+# also should not be passed any arguments, since we need original $*
+# Modelled after $HADOOP_HOME/bin/hadoop-env.sh.
+
+# resolve links - "${BASH_SOURCE-$0}" may be a softlink
+
+this="${BASH_SOURCE-$0}"
+while [ -h "$this" ]; do
+  ls=`ls -ld "$this"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '.*/.*' > /dev/null; then
+    this="$link"
+  else
+    this=`dirname "$this"`/"$link"
+  fi
+done
+
+# convert relative path to absolute path
+bin=`dirname "$this"`
+script=`basename "$this"`
+bin=`cd "$bin">/dev/null; pwd`
+this="$bin/$script"
+
+# the root of the drill installation
+if [ -z "$DRILL_HOME" ]; then
+  export DRILL_HOME=`dirname "$this"`/..
+fi
+
+#check to see if the conf dir or drill home are given as an optional arguments
+while [ $# -gt 1 ]
+do
+  if [ "--config" = "$1" ]
+  then
+    shift
+    confdir=$1
+    shift
+    DRILL_CONF_DIR=$confdir
+  else
+    # Presume we are at end of options and break
+    break
+  fi
+done
+ 
+# Allow alternate drill conf dir location.
+export DRILL_CONF_DIR="${DRILL_CONF_DIR:-/etc/drill/conf}"
+
+. "${DRILL_CONF_DIR}/drill-env.sh"
+
+# Newer versions of glibc use an arena memory allocator that causes virtual
+# memory usage to explode. Tune the variable down to prevent vmem explosion.
+export MALLOC_ARENA_MAX=${MALLOC_ARENA_MAX:-4}
+
+if [ -z "$JAVA_HOME" ]; then
+  if [ -e `which java` ]; then
+    SOURCE=`which java`
+    while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink
+      DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
+      SOURCE="$(readlink "$SOURCE")"
+      [[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
+    done
+    JAVA_HOME="$( cd -P "$( dirname "$SOURCE" )" && cd .. && pwd )"
+  fi
+  # if we didn't set it
+  if [ -z "$JAVA_HOME" ]; then
+    cat 1>&2 <<EOF
++======================================================================+
+|      Error: JAVA_HOME is not set and Java could not be found         |
++----------------------------------------------------------------------+
+| Drill requires Java 1.7 or later.                                    |
++======================================================================+
+EOF
+    exit 1
+  fi
+fi

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/exec/java-exec/src/main/sh/drillbit.sh
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/sh/drillbit.sh b/sandbox/prototype/exec/java-exec/src/main/sh/drillbit.sh
new file mode 100755
index 0000000..f5293a6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/sh/drillbit.sh
@@ -0,0 +1,230 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Copyright 2013 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+# 
+# Environment Variables
+#
+#   DRILL_CONF_DIR   Alternate drill conf dir. Default is ${DRILL_HOME}/conf.
+#   DRILL_LOG_DIR    Where log files are stored.  PWD by default.
+#   DRILL_PID_DIR    The pid files are stored. /tmp by default.
+#   DRILL_IDENT_STRING   A string representing this instance of drillbit. $USER by default
+#   DRILL_NICENESS The scheduling priority for daemons. Defaults to 0.
+#   DRILL_STOP_TIMEOUT  Time, in seconds, after which we kill -9 the server if it has not stopped.
+#                        Default 1200 seconds.
+#
+# Modelled after $HADOOP_HOME/bin/hadoop-daemon.sh
+
+usage="Usage: drillbit.sh [--config <conf-dir>]\
+ (start|stop|restart|autorestart)"
+
+# if no args specified, show usage
+if [ $# -lt 1 ]; then
+  echo $usage
+  exit 1
+fi
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin">/dev/null; pwd`
+
+. "$bin"/drill-config.sh
+
+# get arguments
+startStop=$1
+shift
+
+command=drillbit
+shift
+
+waitForProcessEnd() {
+  pidKilled=$1
+  commandName=$2
+  processedAt=`date +%s`
+  while kill -0 $pidKilled > /dev/null 2>&1;
+   do
+     echo -n "."
+     sleep 1;
+     # if process persists more than $DRILL_STOP_TIMEOUT (default 1200 sec) no mercy
+     if [ $(( `date +%s` - $processedAt )) -gt ${DRILL_STOP_TIMEOUT:-1200} ]; then
+       break;
+     fi
+   done
+  # process still there : kill -9
+  if kill -0 $pidKilled > /dev/null 2>&1; then
+    echo -n force stopping $commandName with kill -9 $pidKilled
+    $JAVA_HOME/bin/jstack -l $pidKilled > "$logout" 2>&1
+    kill -9 $pidKilled > /dev/null 2>&1
+  fi
+  # Add a CR after we're done w/ dots.
+  echo
+}
+
+drill_rotate_log ()
+{
+    log=$1;
+    num=5;
+    if [ -n "$2" ]; then
+    num=$2
+    fi
+    if [ -f "$log" ]; then # rotate logs
+    while [ $num -gt 1 ]; do
+        prev=`expr $num - 1`
+        [ -f "$log.$prev" ] && mv -f "$log.$prev" "$log.$num"
+        num=$prev
+    done
+    mv -f "$log" "$log.$num";
+    fi
+}
+
+check_before_start(){
+    #ckeck if the process is not running
+    mkdir -p "$DRILL_PID_DIR"
+    if [ -f $pid ]; then
+      if kill -0 `cat $pid` > /dev/null 2>&1; then
+        echo $command running as process `cat $pid`.  Stop it first.
+        exit 1
+      fi
+    fi
+}
+
+wait_until_done ()
+{
+    p=$1
+    cnt=${DRILLBIT_TIMEOUT:-300}
+    origcnt=$cnt
+    while kill -0 $p > /dev/null 2>&1; do
+      if [ $cnt -gt 1 ]; then
+        cnt=`expr $cnt - 1`
+        sleep 1
+      else
+        echo "Process did not complete after $origcnt seconds, killing."
+        kill -9 $p
+        exit 1
+      fi
+    done
+    return 0
+}
+
+# get log directory
+if [ "$DRILL_LOG_DIR" = "" ]; then
+  export DRILL_LOG_DIR=/var/log/drill
+fi
+mkdir -p "$DRILL_LOG_DIR"
+
+if [ "$DRILL_PID_DIR" = "" ]; then
+  DRILL_PID_DIR=$DRILL_HOME
+fi
+
+# Some variables
+# Work out java location so can print version into log.
+if [ "$JAVA_HOME" != "" ]; then
+  #echo "run java in $JAVA_HOME"
+  JAVA_HOME=$JAVA_HOME
+fi
+if [ "$JAVA_HOME" = "" ]; then
+  echo "Error: JAVA_HOME is not set."
+  exit 1
+fi
+
+JAVA=$JAVA_HOME/bin/java
+export DRILL_LOG_PREFIX=drillbit
+export DRILL_LOGFILE=$DRILL_LOG_PREFIX.log
+export DRILL_OUTFILE=$DRILL_LOG_PREFIX.out
+loggc=$DRILL_LOG_DIR/$DRILL_LOG_PREFIX.gc
+loglog="${DRILL_LOG_DIR}/${DRILL_LOGFILE}"
+logout="${DRILL_LOG_DIR}/${DRILL_OUTFILE}"
+pid=$DRILL_PID_DIR/drillbit.pid
+
+DRILL_JAVA_OPTS="$DRILL_JAVA_OPTS -Dlog.path=$loglog"
+
+if [ -n "$SERVER_GC_OPTS" ]; then
+  export SERVER_GC_OPTS=${SERVER_GC_OPTS/"-Xloggc:<FILE-PATH>"/"-Xloggc:${loggc}"}
+fi
+if [ -n "$CLIENT_GC_OPTS" ]; then
+  export CLIENT_GC_OPTS=${CLIENT_GC_OPTS/"-Xloggc:<FILE-PATH>"/"-Xloggc:${loggc}"}
+fi
+
+# Set default scheduling priority
+if [ "$DRILL_NICENESS" = "" ]; then
+    export DRILL_NICENESS=0
+fi
+
+thiscmd=$0
+args=$@
+
+case $startStop in
+
+(start)
+    check_before_start
+    echo starting $command, logging to $logout
+    nohup $thiscmd internal_start $command $args < /dev/null >> ${logout} 2>&1  &
+    sleep 1;
+  ;;
+
+(internal_start)
+    drill_rotate_log $loggc
+    # Add to the command log file vital stats on our environment.
+    echo "`date` Starting $command on `hostname`" >> $loglog
+    echo "`ulimit -a`" >> $loglog 2>&1
+    nice -n $DRILL_NICENESS "$DRILL_HOME"/bin/runbit \
+        $command "$@" start >> "$logout" 2>&1 &
+    echo $! > $pid
+    wait
+  ;;
+
+(stop)
+    rm -f "$DRILL_START_FILE"
+    if [ -f $pid ]; then
+      pidToKill=`cat $pid`
+      # kill -0 == see if the PID exists
+      if kill -0 $pidToKill > /dev/null 2>&1; then
+        echo stopping $command
+        echo "`date` Terminating $command" pid $pidToKill>> $loglog
+        kill $pidToKill > /dev/null 2>&1
+        waitForProcessEnd $pidToKill $command
+        rm $pid
+      else
+        retval=$?
+        echo no $command to stop because kill -0 of pid $pidToKill failed with status $retval
+      fi
+    else
+      echo no $command to stop because no pid file $pid
+    fi
+  ;;
+
+(restart)
+    # stop the command
+    $thiscmd --config "${DRILL_CONF_DIR}" stop $command $args &
+    wait_until_done $!
+    # wait a user-specified sleep period
+    sp=${DRILL_RESTART_SLEEP:-3}
+    if [ $sp -gt 0 ]; then
+      sleep $sp
+    fi
+    # start the command
+    $thiscmd --config "${DRILL_CONF_DIR}" start $command $args &
+    wait_until_done $!
+  ;;
+
+(*)
+  echo $usage
+  exit 1
+  ;;
+esac

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/exec/java-exec/src/main/sh/runbit
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/sh/runbit b/sandbox/prototype/exec/java-exec/src/main/sh/runbit
new file mode 100755
index 0000000..4e1b599
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/sh/runbit
@@ -0,0 +1,30 @@
+#!/bin/bash
+
+
+if [ -z $JAVA_HOME ]
+then
+  JAVA=`which java`
+else
+  JAVA=$JAVA_HOME/bin/java
+fi
+
+if [ -e $JAVA ]; then
+  echo ""
+else
+  echo "Java not found."
+  exit 1
+fi
+
+$JAVA -version 2>&1 | grep "version" | egrep -e "1.7" > /dev/null
+if [ $? -ne 0 ]; then
+  echo "Java 1.7 is required to run Apache Drill."
+  exit 1
+fi
+
+
+CP=$DRILL_HOME/jars/*:$CP
+CP=$DRILL_HOME/lib/*:$CP
+
+CP=$CP:$DRILL_CONF_DIR
+
+exec $JAVA $DRILL_JAVA_OPTS -cp $CP org.apache.drill.exec.server.Drillbit

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2bcf0547/sandbox/prototype/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/pom.xml b/sandbox/prototype/pom.xml
index 76bdf24..dcc49a5 100644
--- a/sandbox/prototype/pom.xml
+++ b/sandbox/prototype/pom.xml
@@ -215,6 +215,15 @@
             </lifecycleMappingMetadata>
           </configuration>
         </plugin>
+        <plugin>
+          <artifactId>maven-assembly-plugin</artifactId>
+          <version>2.4</version>
+          <configuration>
+            <descriptors>
+              <descriptor>src/assemble/bin.xml</descriptor>
+            </descriptors>
+          </configuration>
+        </plugin>
       </plugins>
     </pluginManagement>
   </build>
@@ -292,5 +301,6 @@
     <module>exec</module>
     <module>planner</module>
     <module>sqlparser</module>
+    <module>distribution</module>
   </modules>
 </project>


[6/7] git commit: DRILL-187 move hadoop dependency to top level pom

Posted by ja...@apache.org.
DRILL-187 move hadoop dependency to top level pom


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/28414fe2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/28414fe2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/28414fe2

Branch: refs/heads/master
Commit: 28414fe2b9a9a6a05dd65306ca62406e6b72800b
Parents: 2bcf054
Author: Steven Phillips <sp...@maprtech.com>
Authored: Tue Aug 27 19:03:38 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Aug 28 20:37:57 2013 -0700

----------------------------------------------------------------------
 sandbox/prototype/exec/java-exec/pom.xml        | 52 ------------------
 sandbox/prototype/exec/ref/pom.xml              | 12 -----
 .../drill/exec/ref/values/BaseArrayValue.java   |  1 -
 .../drill/exec/ref/values/BaseMapValue.java     |  1 -
 .../drill/exec/ref/values/ScalarValues.java     |  1 -
 .../drill/exec/ref/values/SimpleMapValue.java   |  1 -
 .../drill/exec/ref/values/ValueUtils.java       |  1 -
 sandbox/prototype/pom.xml                       | 55 ++++++++++++++++++++
 8 files changed, 55 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28414fe2/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index e7d3f16..70e4147 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -116,58 +116,6 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-core</artifactId>
-      <version>1.2.1</version>
-      <exclusions>
-        <exclusion>
-          <artifactId>jets3t</artifactId>
-          <groupId>net.java.dev.jets3t</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>log4j</artifactId>
-          <groupId>log4j</groupId>
-        </exclusion>
-
-        <exclusion>
-          <artifactId>mockito-all</artifactId>
-          <groupId>org.mockito</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>commons-logging-api</artifactId>
-          <groupId>commons-logging</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>commons-logging</artifactId>
-          <groupId>commons-logging</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>slf4j-log4j12</artifactId>
-          <groupId>org.slf4j</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>servlet-api-2.5</artifactId>
-          <groupId>org.mortbay.jetty</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>jasper-runtime</artifactId>
-          <groupId>tomcat</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>jasper-compiler</artifactId>
-          <groupId>tomcat</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>jetty</artifactId>
-          <groupId>org.mortbay.jetty</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>jersey-server</artifactId>
-          <groupId>com.sun.jersey</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>core</artifactId>
-          <groupId>org.eclipse.jdt</groupId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>com.carrotsearch</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28414fe2/sandbox/prototype/exec/ref/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/pom.xml b/sandbox/prototype/exec/ref/pom.xml
index 956f095..4a81285 100644
--- a/sandbox/prototype/exec/ref/pom.xml
+++ b/sandbox/prototype/exec/ref/pom.xml
@@ -24,20 +24,8 @@
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-core</artifactId>
-			<version>1.1.1</version>
-			<exclusions>
-				<exclusion>
-					<artifactId>jets3t</artifactId>
-					<groupId>net.java.dev.jets3t</groupId>
-				</exclusion>
-				<exclusion>
-					<artifactId>commons-logging</artifactId>
-					<groupId>commons-logging</groupId>
-				</exclusion>
-			</exclusions>
 		</dependency>
 
-
 		
 		<dependency>
 			<groupId>com.carrotsearch</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28414fe2/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java
index 5831d37..be808a3 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseArrayValue.java
@@ -21,7 +21,6 @@ import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.tools.ant.types.DataType;
 
 
 public abstract class BaseArrayValue extends BaseDataValue implements ContainerValue{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28414fe2/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java
index 87bd344..4b7e192 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/BaseMapValue.java
@@ -25,7 +25,6 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ref.exceptions.RecordException;
-import org.apache.tools.ant.types.DataType;
 
 public abstract class BaseMapValue extends BaseDataValue implements ContainerValue,
     Iterable<Map.Entry<CharSequence, DataValue>> {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28414fe2/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
index 201c8fc..ba2e85a 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
@@ -29,7 +29,6 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
 import org.apache.drill.exec.ref.rops.DataWriter;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.tools.ant.types.DataType;
 
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28414fe2/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java
index ab231ff..ec11225 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java
@@ -27,7 +27,6 @@ import java.util.Map.Entry;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.exec.ref.exceptions.RecordException;
 import org.apache.drill.exec.ref.rops.DataWriter;
-import org.apache.tools.ant.types.DataType;
 
 public class SimpleMapValue extends BaseMapValue{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleMapValue.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28414fe2/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ValueUtils.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ValueUtils.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ValueUtils.java
index dff8b11..11617a4 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ValueUtils.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ValueUtils.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ref.exceptions.RecordException;
-import org.apache.tools.ant.types.DataType;
 
 public class ValueUtils {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueUtils.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/28414fe2/sandbox/prototype/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/pom.xml b/sandbox/prototype/pom.xml
index dcc49a5..98b2232 100644
--- a/sandbox/prototype/pom.xml
+++ b/sandbox/prototype/pom.xml
@@ -292,7 +292,62 @@
   <!-- Managed Dependencies -->
   <dependencyManagement>
     <dependencies>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-core</artifactId>
+        <version>1.2.1</version>
+        <exclusions>
+          <exclusion>
+            <artifactId>jets3t</artifactId>
+            <groupId>net.java.dev.jets3t</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>log4j</artifactId>
+            <groupId>log4j</groupId>
+          </exclusion>
 
+          <exclusion>
+            <artifactId>mockito-all</artifactId>
+            <groupId>org.mockito</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>commons-logging-api</artifactId>
+            <groupId>commons-logging</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>commons-logging</artifactId>
+            <groupId>commons-logging</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>slf4j-log4j12</artifactId>
+            <groupId>org.slf4j</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>servlet-api-2.5</artifactId>
+            <groupId>org.mortbay.jetty</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>jasper-runtime</artifactId>
+            <groupId>tomcat</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>jasper-compiler</artifactId>
+            <groupId>tomcat</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>jetty</artifactId>
+            <groupId>org.mortbay.jetty</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>jersey-server</artifactId>
+            <groupId>com.sun.jersey</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>core</artifactId>
+            <groupId>org.eclipse.jdt</groupId>
+          </exclusion>
+        </exclusions>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <modules>


[4/7] git commit: DRILL-190 (part3) Fixes for integration between JSON changes, updated CodeGenerator and merge join

Posted by ja...@apache.org.
DRILL-190 (part3) Fixes for integration between JSON changes, updated CodeGenerator and merge join


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5232b0e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5232b0e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5232b0e1

Branch: refs/heads/master
Commit: 5232b0e1423deecb206e79d5978e4fe7db8197b6
Parents: e0bac2f
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed Aug 28 17:48:36 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Aug 28 20:37:40 2013 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/ScanBatch.java     |  2 +-
 .../physical/impl/aggregate/InternalBatch.java  |  2 +-
 .../exec/physical/impl/join/JoinEvaluator.java  | 10 ------
 .../physical/impl/join/JoinInnerSignature.java  | 35 --------------------
 .../exec/physical/impl/join/MergeJoinBatch.java |  4 +--
 .../partitionsender/OutgoingRecordBatch.java    |  2 +-
 .../drill/exec/record/AbstractRecordBatch.java  |  2 +-
 .../drill/exec/record/RecordBatchLoader.java    |  2 +-
 .../drill/exec/record/VectorContainer.java      |  2 +-
 .../exec/physical/impl/join/TestMergeJoin.java  |  3 +-
 .../test/resources/join/merge_single_batch.json | 20 +++++++----
 11 files changed, 24 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5232b0e1/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 192c03c..ae043ec 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -134,7 +134,7 @@ public class ScanBatch implements RecordBatch {
 
   @Override
   public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
-    return container.getVectorAccessor(fieldId, clazz);
+    return container.getValueAccessorById(fieldId, clazz);
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5232b0e1/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index 343dbe5..77dd682 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -59,7 +59,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
   }
   
   public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
-    return container.getVectorAccessor(fieldId, clazz);
+    return container.getValueAccessorById(fieldId, clazz);
   }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5232b0e1/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
deleted file mode 100644
index beb3e28..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinEvaluator.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.drill.exec.physical.impl.join;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.VectorContainer;
-
-public interface JoinEvaluator {
-  public abstract void doSetup(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5232b0e1/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java
deleted file mode 100644
index 1081244..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinInnerSignature.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.physical.impl.join;
-
-import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
-
-import javax.inject.Named;
-
-import org.apache.drill.exec.compile.sig.CodeGeneratorSignature;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
-import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.VectorContainer;
-
-
-public interface JoinInnerSignature extends CodeGeneratorSignature {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5232b0e1/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index a2b84da..af33ca4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -317,7 +317,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft",
                                                       new TypedFieldId(vw.getField().getType(), vectorId));
       JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
-                                                       new TypedFieldId(vw.getField().getType(),vectorId, true));
+                                                       new TypedFieldId(vw.getField().getType(),vectorId));
       // todo: check for room in vvOut
       cg.getEvalBlock().add(vvOut.invoke("copyFrom")
                                    .arg(COPY_LEFT_MAPPING.getValueReadIndex())
@@ -336,7 +336,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight",
                                                       new TypedFieldId(vw.getField().getType(), vectorId - rightVectorBase));
       JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
-                                                       new TypedFieldId(vw.getField().getType(),vectorId, true));
+                                                       new TypedFieldId(vw.getField().getType(),vectorId));
       cg.getEvalBlock().add(vvOut.invoke("copyFrom")
           .arg(COPY_RIGHT_MAPPING.getValueReadIndex())
           .arg(COPY_RIGHT_MAPPING.getValueWriteIndex())

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5232b0e1/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 0cefc52..e429402 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -222,7 +222,7 @@ public class OutgoingRecordBatch implements RecordBatch {
 
   @Override
   public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
-    return vectorContainer.getVectorAccessor(fieldId, clazz);
+    return vectorContainer.getValueAccessorById(fieldId, clazz);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5232b0e1/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index a2584b8..ccd2468 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -65,7 +65,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
 
   @Override
   public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
-    return container.getVectorAccessor(fieldId, clazz);
+    return container.getValueAccessorById(fieldId, clazz);
   }
 
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5232b0e1/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 4d47404..9ac53f0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -140,7 +140,7 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
   }
 
   public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
-    return container.getVectorAccessor(fieldId, clazz);
+    return container.getValueAccessorById(fieldId, clazz);
   }
   
   public WritableBatch getWritableBatch(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5232b0e1/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 25036fc..e6c8bab 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -108,7 +108,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> {
   }
 
   @SuppressWarnings("unchecked")
-  public <T extends ValueVector> VectorWrapper<T> getVectorAccessor(int fieldId, Class<?> clazz) {
+  public <T extends ValueVector> VectorWrapper<T> getValueAccessorById(int fieldId, Class<?> clazz) {
     VectorWrapper<?> va = wrappers.get(fieldId);
     assert va != null;
     if (va.getVectorClass() != clazz) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5232b0e1/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
index 38b8225..6aa651b 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
@@ -112,9 +112,10 @@ public class TestMergeJoin {
     new NonStrictExpectations(){{
       bitContext.getMetrics(); result = new MetricRegistry("test");
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+      bitContext.getConfig(); result = c;
     }};
 
-    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(),CoordinationProtos.DrillbitEndpoint.getDefaultInstance(), new StorageEngineRegistry(bitContext));
     PhysicalPlan plan = reader.readPhysicalPlan(
         Files.toString(
             FileUtils.getResourceAsFile("/join/merge_single_batch.json"), Charsets.UTF_8)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5232b0e1/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
index 0e4f79d..ad33d26 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/join/merge_single_batch.json
@@ -10,16 +10,24 @@
     {
       @id:1,
       pop:"json-sub-scan",
-      entries:[
-        {url: "#{LEFT_FILE}"}
-      ]
+      readEntries:[
+        {path: "#{LEFT_FILE}"}
+      ],
+      engineConfig:{
+         "type":"json",
+         "dfsName" : "file:///"
+      }      
     },
     {
       @id:2,
       pop:"json-sub-scan",
-      entries:[
-        {url: "#{RIGHT_FILE}"}
-      ]
+      readEntries:[
+        {path: "#{RIGHT_FILE}"}
+      ],
+      engineConfig:{
+         "type":"json",
+         "dfsName" : "file:///"
+      }      
     },
     {
       @id: 3,


[7/7] git commit: DRILL-188 add profiles to build with hadoop artifacts from vendors: mapr, cloudera, hortonworks

Posted by ja...@apache.org.
DRILL-188 add profiles to build with hadoop artifacts from vendors: mapr, cloudera, hortonworks


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4515263d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4515263d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4515263d

Branch: refs/heads/master
Commit: 4515263d38e931275ab1f8f35ec761728c2b4156
Parents: 28414fe
Author: Steven Phillips <sp...@maprtech.com>
Authored: Wed Aug 28 15:51:20 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Aug 28 20:38:47 2013 -0700

----------------------------------------------------------------------
 sandbox/prototype/exec/java-exec/pom.xml |  58 +++++-
 sandbox/prototype/exec/ref/pom.xml       |  45 ++++-
 sandbox/prototype/pom.xml                | 256 ++++++++++++++++++++------
 sandbox/prototype/sqlparser/pom.xml      |  42 +++++
 4 files changed, 331 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4515263d/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index 70e4147..a07e43a 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -52,6 +52,16 @@
       <groupId>com.twitter</groupId>
       <artifactId>parquet-column</artifactId>
       <version>1.0.1</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>com.twitter</groupId>
@@ -114,10 +124,6 @@
       <version>1.0.5-M3</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-core</artifactId>
-    </dependency>
-    <dependency>
       <groupId>com.carrotsearch</groupId>
       <artifactId>hppc</artifactId>
       <version>0.4.2</version>
@@ -142,8 +148,52 @@
       <artifactId>janino</artifactId>
       <version>2.6.1</version>
     </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+      <version>6.1.26</version>
+    </dependency>
   </dependencies>
 
+  <profiles>
+    <profile>
+      <id>default-hadoop</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>mapr</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>cdh4</id>
+      <dependencies>
+        <dependency>
+          <artifactId>hadoop-common</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>hdp</id>
+      <dependencies>
+        <dependency>
+          <artifactId>hadoop-common</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
   <build>
     <plugins>
       <plugin>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4515263d/sandbox/prototype/exec/ref/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/pom.xml b/sandbox/prototype/exec/ref/pom.xml
index 4a81285..5724015 100644
--- a/sandbox/prototype/exec/ref/pom.xml
+++ b/sandbox/prototype/exec/ref/pom.xml
@@ -22,12 +22,6 @@
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-		</dependency>
-
-		
-		<dependency>
 			<groupId>com.carrotsearch</groupId>
 			<artifactId>hppc</artifactId>
 			<version>0.4.2</version>
@@ -40,6 +34,45 @@
 		</dependency>
 	</dependencies>
 
+  <profiles>
+    <profile>
+      <id>default-hadoop</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>mapr</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>cdh4</id>
+      <dependencies>
+        <dependency>
+          <artifactId>hadoop-common</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>hdp</id>
+      <dependencies>
+        <dependency>
+          <artifactId>hadoop-common</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
   <build>
     <plugins>
       <plugin>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4515263d/sandbox/prototype/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/pom.xml b/sandbox/prototype/pom.xml
index 98b2232..5a714df 100644
--- a/sandbox/prototype/pom.xml
+++ b/sandbox/prototype/pom.xml
@@ -285,71 +285,207 @@
       <scope>test</scope>
     </dependency>
 
-
-
   </dependencies>
 
   <!-- Managed Dependencies -->
   <dependencyManagement>
-    <dependencies>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-core</artifactId>
-        <version>1.2.1</version>
-        <exclusions>
-          <exclusion>
-            <artifactId>jets3t</artifactId>
-            <groupId>net.java.dev.jets3t</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>log4j</artifactId>
-            <groupId>log4j</groupId>
-          </exclusion>
-
-          <exclusion>
-            <artifactId>mockito-all</artifactId>
-            <groupId>org.mockito</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>commons-logging-api</artifactId>
-            <groupId>commons-logging</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>commons-logging</artifactId>
-            <groupId>commons-logging</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>slf4j-log4j12</artifactId>
-            <groupId>org.slf4j</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>servlet-api-2.5</artifactId>
-            <groupId>org.mortbay.jetty</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>jasper-runtime</artifactId>
-            <groupId>tomcat</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>jasper-compiler</artifactId>
-            <groupId>tomcat</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>jetty</artifactId>
-            <groupId>org.mortbay.jetty</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>jersey-server</artifactId>
-            <groupId>com.sun.jersey</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>core</artifactId>
-            <groupId>org.eclipse.jdt</groupId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-    </dependencies>
   </dependencyManagement>
+
+  <profiles>
+    <profile>
+      <id>default-hadoop</id>
+      <activation>
+        <property>
+          <name>!alt-hadoop</name>
+        </property>
+      </activation>
+      <dependencyManagement>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+            <version>1.2.1</version>
+            <exclusions>
+              <exclusion>
+                <artifactId>jets3t</artifactId>
+                <groupId>net.java.dev.jets3t</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>log4j</artifactId>
+                <groupId>log4j</groupId>
+              </exclusion>
+
+              <exclusion>
+                <artifactId>mockito-all</artifactId>
+                <groupId>org.mockito</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>commons-logging-api</artifactId>
+                <groupId>commons-logging</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>commons-logging</artifactId>
+                <groupId>commons-logging</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>jersey-server</artifactId>
+                <groupId>com.sun.jersey</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>core</artifactId>
+                <groupId>org.eclipse.jdt</groupId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+        </dependencies>
+      </dependencyManagement>
+    </profile>
+    <profile>
+      <id>mapr</id>
+      <properties>
+        <alt-hadoop>mapr</alt-hadoop>
+      </properties>
+      <dependencyManagement>
+        <dependencies>
+          <dependency>
+            <groupId>com.mapr.hadoop</groupId>
+            <artifactId>maprfs</artifactId>
+            <version>1.0.3-mapr-3.0.0</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+            <version>1.0.3-mapr-3.0.0</version>
+            <exclusions>
+              <exclusion>
+                <artifactId>jets3t</artifactId>
+                <groupId>net.java.dev.jets3t</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>log4j</artifactId>
+                <groupId>log4j</groupId>
+              </exclusion>
+
+              <exclusion>
+                <artifactId>mockito-all</artifactId>
+                <groupId>org.mockito</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>commons-logging-api</artifactId>
+                <groupId>commons-logging</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>commons-logging</artifactId>
+                <groupId>commons-logging</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>slf4j-log4j12</artifactId>
+                <groupId>org.slf4j</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>servlet-api-2.5</artifactId>
+                <groupId>org.mortbay.jetty</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>jasper-runtime</artifactId>
+                <groupId>tomcat</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>jasper-compiler</artifactId>
+                <groupId>tomcat</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>jetty</artifactId>
+                <groupId>org.mortbay.jetty</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>jersey-server</artifactId>
+                <groupId>com.sun.jersey</groupId>
+              </exclusion>
+              <exclusion>
+                <artifactId>core</artifactId>
+                <groupId>org.eclipse.jdt</groupId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+        </dependencies>
+      </dependencyManagement>
+      <repositories>
+        <repository>
+          <id>mapr-releases</id>
+          <url>http://repository.mapr.com/maven/</url>
+          <snapshots><enabled>false</enabled></snapshots>
+          <releases><enabled>true</enabled></releases>
+        </repository>
+      </repositories>
+    </profile>
+    <profile>
+      <id>cdh</id>
+      <properties>
+        <alt-hadoop>cdh4</alt-hadoop>
+      </properties>
+      <dependencyManagement>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>2.0.0-cdh4.4.0</version>
+            <exclusions>
+              <exclusion>
+                <artifactId>commons-logging</artifactId>
+                <groupId>commons-logging</groupId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+        </dependencies>
+      </dependencyManagement>
+      <repositories>
+        <repository>
+          <id>cloudera</id>
+          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+        </repository>
+      </repositories>
+    </profile>
+    <profile>
+      <id>hdp</id>
+      <properties>
+        <alt-hadoop>hdp</alt-hadoop>
+      </properties>
+      <dependencyManagement>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>2.1.0.2.0.4.0-38</version>
+            <exclusions>
+              <exclusion>
+                <artifactId>commons-logging</artifactId>
+                <groupId>commons-logging</groupId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+        </dependencies>
+      </dependencyManagement>
+      <repositories>
+        <repository>
+          <releases>
+            <enabled>true</enabled>
+            <updatePolicy>always</updatePolicy>
+            <checksumPolicy>warn</checksumPolicy>
+          </releases>
+          <snapshots>
+            <enabled>false</enabled>
+            <updatePolicy>never</updatePolicy>
+            <checksumPolicy>fail</checksumPolicy>
+          </snapshots>
+          <id>HDPReleases</id>
+          <name>HDP Releases</name>
+          <url>http://repo.hortonworks.com/content/repositories/releases</url>
+          <layout>default</layout>
+        </repository>
+      </repositories>
+    </profile>
+  </profiles>
   <modules>
     <module>common</module>
     <module>contrib</module>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4515263d/sandbox/prototype/sqlparser/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/pom.xml b/sandbox/prototype/sqlparser/pom.xml
index f30944c..c15e93c 100644
--- a/sandbox/prototype/sqlparser/pom.xml
+++ b/sandbox/prototype/sqlparser/pom.xml
@@ -92,4 +92,46 @@
       <version>2.7.1</version>
     </dependency>
   </dependencies>
+  <profiles>
+    <profile>
+      <id>default-hadoop</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>mapr</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>cdh4</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>hdp</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
 </project>