You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/05/05 13:27:25 UTC

[1/5] drill git commit: DRILL-5428: submit_plan fails after Drill 1.8 script revisions

Repository: drill
Updated Branches:
  refs/heads/master 1e0a14cc9 -> 41ffed50f


DRILL-5428: submit_plan fails after Drill 1.8 script revisions

When the other scripts were updated, submit_plan was not corrected.
After Drill 1.8, drill-config.sh consumes all command line arguments,
finds the —config and —site options, removes them, and places the rest
in the new args array.

This PR updates submit_plan to use the new args array.

The fix was tested on a test cluster: we verified that a physical plan
was submitted and ran.

closes #816


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e1bc44c1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e1bc44c1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e1bc44c1

Branch: refs/heads/master
Commit: e1bc44c178dc1e05967834337033dd0430800630
Parents: 1e0a14c
Author: Paul Rogers <pr...@maprtech.com>
Authored: Tue Apr 11 14:42:57 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri May 5 15:41:46 2017 +0300

----------------------------------------------------------------------
 distribution/src/resources/submit_plan | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e1bc44c1/distribution/src/resources/submit_plan
----------------------------------------------------------------------
diff --git a/distribution/src/resources/submit_plan b/distribution/src/resources/submit_plan
index 86434bf..eab7215 100755
--- a/distribution/src/resources/submit_plan
+++ b/distribution/src/resources/submit_plan
@@ -22,4 +22,4 @@ bin=`cd -P "$bin">/dev/null; pwd`
 
 DRILL_SHELL_JAVA_OPTS="$DRILL_SHELL_JAVA_OPTS -Dlog.path=$DRILL_LOG_DIR/submitter.log -Dlog.query.path=$DRILL_LOG_DIR/submitter_queries.json"
 
-exec $JAVA $DRILL_SHELL_JAVA_OPTS $DRILL_JAVA_OPTS -cp $CP org.apache.drill.exec.client.QuerySubmitter "$@"
+exec $JAVA $DRILL_SHELL_JAVA_OPTS $DRILL_JAVA_OPTS -cp $CP org.apache.drill.exec.client.QuerySubmitter "${args[@]}"


[3/5] drill git commit: DRILL-5344: External sort priority queue copier fails with an empty batch

Posted by ar...@apache.org.
DRILL-5344: External sort priority queue copier fails with an empty batch

Unit tests showed that the “priority queue copier” does not handle an
empty batch. This has not been an issue because code elsewhere in the
sort specifically works around this issue. This fix resolves the issue
at the source to avoid the need for future work-arounds.

closes #778


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f1baec3c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f1baec3c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f1baec3c

Branch: refs/heads/master
Commit: f1baec3c6a876c274229d0b323a348a051f2f5b7
Parents: 35bccd0
Author: Paul Rogers <pr...@maprtech.com>
Authored: Fri Mar 10 15:56:18 2017 -0800
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri May 5 15:45:56 2017 +0300

----------------------------------------------------------------------
 .../impl/xsort/managed/PriorityQueueCopierTemplate.java     | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f1baec3c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
index 81856fa..76b178c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
@@ -57,9 +57,12 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
 
     queueSize = 0;
     for (int i = 0; i < size; i++) {
-      vector4.set(i, i, batchGroups.get(i).getNextIndex());
-      siftUp();
-      queueSize++;
+      int index = batchGroups.get(i).getNextIndex();
+      vector4.set(i, i, index);
+      if (index > -1) {
+        siftUp();
+        queueSize++;
+      }
     }
   }
 


[5/5] drill git commit: DRILL-5423: Refactor ScanBatch to allow unit testing record readers

Posted by ar...@apache.org.
DRILL-5423: Refactor ScanBatch to allow unit testing record readers

Refactors ScanBatch to allow unit testing of record reader
implementations, especially the “writer” classes.

See JIRA for details.

closes #811


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/41ffed50
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/41ffed50
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/41ffed50

Branch: refs/heads/master
Commit: 41ffed50fbb9319b4a796555396b88de010cb10b
Parents: 0939485
Author: Paul Rogers <pr...@maprtech.com>
Authored: Sat Apr 8 20:52:04 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri May 5 15:46:01 2017 +0300

----------------------------------------------------------------------
 .../exec/ops/AbstractOperatorExecContext.java   |  90 ++++++++++++++++
 .../apache/drill/exec/ops/OperatorContext.java  |  51 ++--------
 .../drill/exec/ops/OperatorContextImpl.java     | 102 ++++++-------------
 .../drill/exec/ops/OperatorExecContext.java     |  46 +++++++++
 .../drill/exec/ops/OperatorUtilities.java       |  48 +++++++++
 .../drill/exec/physical/impl/BaseRootExec.java  |   5 +-
 .../drill/exec/physical/impl/ScanBatch.java     |  62 ++++++++---
 .../drill/exec/memory/TestAllocators.java       |   7 +-
 .../drill/exec/record/TestRecordIterator.java   |   5 +-
 9 files changed, 280 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
new file mode 100644
index 0000000..a517fdf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.testing.ExecutionControls;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Implementation of {@link OperatorExecContext} that provides services
+ * needed by most run-time operators. Excludes services that need the
+ * entire Drillbit. Allows easy testing of operator code that uses this
+ * interface.
+ */
+
+public class AbstractOperatorExecContext implements OperatorExecContext {
+
+  protected final BufferAllocator allocator;
+  protected final ExecutionControls executionControls;
+  protected final PhysicalOperator popConfig;
+  protected final BufferManager manager;
+  protected final OperatorStatReceiver statsWriter;
+
+  public AbstractOperatorExecContext(BufferAllocator allocator, PhysicalOperator popConfig,
+                                     ExecutionControls executionControls,
+                                     OperatorStatReceiver stats) {
+    this.allocator = allocator;
+    this.popConfig = popConfig;
+    manager = new BufferManagerImpl(allocator);
+    statsWriter = stats;
+    this.executionControls = executionControls;
+  }
+
+  @Override
+  public DrillBuf replace(DrillBuf old, int newSize) {
+    return manager.replace(old, newSize);
+  }
+
+  @Override
+  public DrillBuf getManagedBuffer() {
+    return manager.getManagedBuffer();
+  }
+
+  @Override
+  public DrillBuf getManagedBuffer(int size) {
+    return manager.getManagedBuffer(size);
+  }
+
+  @Override
+  public ExecutionControls getExecutionControls() { return executionControls; }
+
+  @Override
+  public OperatorStatReceiver getStatsWriter() { return statsWriter; }
+
+  @Override
+  public BufferAllocator getAllocator() {
+    if (allocator == null) {
+      throw new UnsupportedOperationException("Operator context does not have an allocator");
+    }
+    return allocator;
+  }
+
+  @Override
+  public void close() {
+    try {
+      manager.close();
+    } finally {
+      if (allocator != null) {
+        allocator.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index d6045fc..b248d5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,44 +18,28 @@
 package org.apache.drill.exec.ops;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import io.netty.buffer.DrillBuf;
+public interface OperatorContext extends OperatorExecContext {
 
-public abstract class OperatorContext {
+  OperatorStats getStats();
 
-  public abstract DrillBuf replace(DrillBuf old, int newSize);
+  ExecutorService getExecutor();
 
-  public abstract DrillBuf getManagedBuffer();
+  ExecutorService getScanExecutor();
 
-  public abstract DrillBuf getManagedBuffer(int size);
+  ExecutorService getScanDecodeExecutor();
 
-  public abstract BufferAllocator getAllocator();
+  DrillFileSystem newFileSystem(Configuration conf) throws IOException;
 
-  public abstract OperatorStats getStats();
-
-  public abstract ExecutorService getExecutor();
-
-  public abstract ExecutorService getScanExecutor();
-
-  public abstract ExecutorService getScanDecodeExecutor();
-
-  public abstract ExecutionControls getExecutionControls();
-
-  public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException;
-
-  public abstract DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException;
+  DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException;
 
   /**
    * Run the callable as the given proxy user.
@@ -65,21 +49,6 @@ public abstract class OperatorContext {
    * @param <RESULT> result type
    * @return Future<RESULT> future with the result of calling the callable
    */
-  public abstract <RESULT> ListenableFuture<RESULT> runCallableAs(UserGroupInformation proxyUgi,
+  <RESULT> ListenableFuture<RESULT> runCallableAs(UserGroupInformation proxyUgi,
                                                                   Callable<RESULT> callable);
-
-  public static int getChildCount(PhysicalOperator popConfig) {
-    Iterator<PhysicalOperator> iter = popConfig.iterator();
-    int i = 0;
-    while (iter.hasNext()) {
-      iter.next();
-      i++;
-    }
-
-    if (i == 0) {
-      i = 1;
-    }
-    return i;
-  }
-
-}
+ }

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index c19cc1f..37c609e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.ops;
 
-import io.netty.buffer.DrillBuf;
-
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
@@ -26,10 +24,8 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.work.WorkManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -39,15 +35,11 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
-class OperatorContextImpl extends OperatorContext implements AutoCloseable {
+class OperatorContextImpl extends AbstractOperatorExecContext implements OperatorContext, AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
 
-  private final BufferAllocator allocator;
-  private final ExecutionControls executionControls;
   private boolean closed = false;
-  private final PhysicalOperator popConfig;
   private final OperatorStats stats;
-  private final BufferManager manager;
   private DrillFileSystem fs;
   private final ExecutorService executor;
   private final ExecutorService scanExecutor;
@@ -62,75 +54,43 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
   private ListeningExecutorService delegatePool;
 
   public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException {
-    this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
-        popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
-    this.popConfig = popConfig;
-    this.manager = new BufferManagerImpl(allocator);
-
-    OpProfileDef def =
-        new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
-    stats = context.getStats().newOperatorStats(def, allocator);
-    executionControls = context.getExecutionControls();
-    executor = context.getDrillbitContext().getExecutor();
-    scanExecutor = context.getDrillbitContext().getScanExecutor();
-    scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
+    this(popConfig, context, null);
   }
 
   public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats)
       throws OutOfMemoryException {
-    this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
-        popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
-    this.popConfig = popConfig;
-    this.manager = new BufferManagerImpl(allocator);
-    this.stats     = stats;
-    executionControls = context.getExecutionControls();
+    super(context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
+          popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation()),
+          popConfig, context.getExecutionControls(), stats);
+    if (stats != null) {
+      this.stats = stats;
+    } else {
+      OpProfileDef def =
+          new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(),
+                           OperatorUtilities.getChildCount(popConfig));
+      this.stats = context.getStats().newOperatorStats(def, allocator);
+    }
     executor = context.getDrillbitContext().getExecutor();
     scanExecutor = context.getDrillbitContext().getScanExecutor();
     scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
   }
 
-  @Override
-  public DrillBuf replace(DrillBuf old, int newSize) {
-    return manager.replace(old, newSize);
-  }
-
-  @Override
-  public DrillBuf getManagedBuffer() {
-    return manager.getManagedBuffer();
-  }
-
-  @Override
-  public DrillBuf getManagedBuffer(int size) {
-    return manager.getManagedBuffer(size);
-  }
-
   // Allow an operator to use the thread pool
   @Override
   public ExecutorService getExecutor() {
     return executor;
   }
+
   @Override
   public ExecutorService getScanExecutor() {
     return scanExecutor;
   }
+
   @Override
   public ExecutorService getScanDecodeExecutor() {
     return scanDecodeExecutor;
   }
 
-  @Override
-  public ExecutionControls getExecutionControls() {
-    return executionControls;
-  }
-
-  @Override
-  public BufferAllocator getAllocator() {
-    if (allocator == null) {
-      throw new UnsupportedOperationException("Operator context does not have an allocator");
-    }
-    return allocator;
-  }
-
   public boolean isClosed() {
     return closed;
   }
@@ -143,20 +103,19 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     }
     logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null);
 
-    manager.close();
-
-    if (allocator != null) {
-      allocator.close();
-    }
-
-    if (fs != null) {
-      try {
-        fs.close();
-      } catch (IOException e) {
-        throw new DrillRuntimeException(e);
+    closed = true;
+    try {
+      super.close();
+    } finally {
+      if (fs != null) {
+        try {
+          fs.close();
+          fs = null;
+        } catch (IOException e) {
+          throw new DrillRuntimeException(e);
+        }
       }
     }
-    closed = true;
   }
 
   @Override
@@ -201,14 +160,13 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     return fs;
   }
 
-  @Override
-  /*
-     Creates a DrillFileSystem that does not automatically track operator stats.
+  /**
+   * Creates a DrillFileSystem that does not automatically track operator stats.
    */
+  @Override
   public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException {
     Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
     fs = new DrillFileSystem(conf, null);
     return fs;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
new file mode 100644
index 0000000..4d64aba
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.testing.ExecutionControls;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Narrowed version of the {@link OperatorContext} used to create an
+ * easy-to-test version of the operator context that excludes services
+ * that require a full Drillbit server.
+ */
+
+public interface OperatorExecContext {
+
+  DrillBuf replace(DrillBuf old, int newSize);
+
+  DrillBuf getManagedBuffer();
+
+  DrillBuf getManagedBuffer(int size);
+
+  BufferAllocator getAllocator();
+
+  ExecutionControls getExecutionControls();
+
+  OperatorStatReceiver getStatsWriter();
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java
new file mode 100644
index 0000000..2e6e759
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+/**
+ * Utility methods, formerly on the OperatorContext class, that work with
+ * operators. The utilities here are available to operators at unit test
+ * time, while methods in OperatorContext are available only in production
+ * code.
+ */
+
+public class OperatorUtilities {
+
+  private OperatorUtilities() { }
+
+  public static int getChildCount(PhysicalOperator popConfig) {
+    Iterator<PhysicalOperator> iter = popConfig.iterator();
+    int count = 0;
+    while (iter.hasNext()) {
+      iter.next();
+      count++;
+    }
+
+    if (count == 0) {
+      count = 1;
+    }
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index f720f8e..d01e294 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.OperatorUtilities;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.CloseableRecordBatch;
@@ -44,7 +45,7 @@ public abstract class BaseRootExec implements RootExec {
   public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = fragmentContext.newOperatorContext(config, stats);
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
-        config.getOperatorType(), OperatorContext.getChildCount(config)),
+        config.getOperatorType(), OperatorUtilities.getChildCount(config)),
         oContext.getAllocator());
     fragmentContext.getStats().addOperatorStats(this.stats);
     this.fragmentContext = fragmentContext;
@@ -54,7 +55,7 @@ public abstract class BaseRootExec implements RootExec {
       final PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = oContext;
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
-        config.getOperatorType(), OperatorContext.getChildCount(config)),
+        config.getOperatorType(), OperatorUtilities.getChildCount(config)),
       oContext.getAllocator());
     fragmentContext.getStats().addOperatorStats(this.stats);
     this.fragmentContext = fragmentContext;

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 011e751..5a9af39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -35,6 +35,7 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorExecContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -56,6 +57,7 @@ import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 
 /**
@@ -68,19 +70,14 @@ public class ScanBatch implements CloseableRecordBatch {
   /** Main collection of fields' value vectors. */
   private final VectorContainer container = new VectorContainer();
 
-  /** Fields' value vectors indexed by fields' keys. */
-  private final CaseInsensitiveMap<ValueVector> fieldVectorMap =
-          CaseInsensitiveMap.newHashMap();
-
   private int recordCount;
   private final FragmentContext context;
   private final OperatorContext oContext;
   private Iterator<RecordReader> readers;
   private RecordReader currentReader;
   private BatchSchema schema;
-  private final Mutator mutator = new Mutator();
+  private final Mutator mutator;
   private boolean done = false;
-  private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private boolean hasReadNonEmptyFile = false;
   private Map<String, ValueVector> implicitVectors;
   private Iterator<Map<String, String>> implicitColumns;
@@ -98,6 +95,7 @@ public class ScanBatch implements CloseableRecordBatch {
     currentReader = readers.next();
     this.oContext = oContext;
     allocator = oContext.getAllocator();
+    mutator = new Mutator(oContext, allocator, container);
 
     boolean setup = false;
     try {
@@ -158,7 +156,7 @@ public class ScanBatch implements CloseableRecordBatch {
   }
 
   private void clearFieldVectorMap() {
-    for (final ValueVector v : fieldVectorMap.values()) {
+    for (final ValueVector v : mutator.fieldVectorMap().values()) {
       v.clear();
     }
   }
@@ -173,7 +171,7 @@ public class ScanBatch implements CloseableRecordBatch {
       try {
         injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
 
-        currentReader.allocate(fieldVectorMap);
+        currentReader.allocate(mutator.fieldVectorMap());
       } catch (OutOfMemoryException e) {
         logger.debug("Caught Out of Memory Exception", e);
         clearFieldVectorMap();
@@ -204,10 +202,8 @@ public class ScanBatch implements CloseableRecordBatch {
           // If all the files we have read so far are just empty, the schema is not useful
           if (! hasReadNonEmptyFile) {
             container.clear();
-            for (ValueVector v : fieldVectorMap.values()) {
-              v.clear();
-            }
-            fieldVectorMap.clear();
+            clearFieldVectorMap();
+            mutator.clear();
           }
 
           currentReader.close();
@@ -215,7 +211,7 @@ public class ScanBatch implements CloseableRecordBatch {
           implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
           currentReader.setup(oContext, mutator);
           try {
-            currentReader.allocate(fieldVectorMap);
+            currentReader.allocate(mutator.fieldVectorMap());
           } catch (OutOfMemoryException e) {
             logger.debug("Caught OutOfMemoryException");
             clearFieldVectorMap();
@@ -323,11 +319,41 @@ public class ScanBatch implements CloseableRecordBatch {
     return container.getValueAccessorById(clazz, ids);
   }
 
-  private class Mutator implements OutputMutator {
+  /**
+   * Row set mutator implementation provided to record readers created by
+   * this scan batch. Made visible so that tests can create this mutator
+   * without also needing a ScanBatch instance. (This class is really independent
+   * of the ScanBatch, but resides here for historical reasons. This is,
+   * in turn, the only use of the genereated vector readers in the vector
+   * package.)
+   */
+
+  @VisibleForTesting
+  public static class Mutator implements OutputMutator {
     /** Whether schema has changed since last inquiry (via #isNewSchema}).  Is
      *  true before first inquiry. */
     private boolean schemaChanged = true;
 
+    /** Fields' value vectors indexed by fields' keys. */
+    private final CaseInsensitiveMap<ValueVector> fieldVectorMap =
+            CaseInsensitiveMap.newHashMap();
+
+    private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+    private final BufferAllocator allocator;
+
+    private final VectorContainer container;
+
+    private final OperatorExecContext oContext;
+
+    public Mutator(OperatorExecContext oContext, BufferAllocator allocator, VectorContainer container) {
+      this.oContext = oContext;
+      this.allocator = allocator;
+      this.container = container;
+    }
+
+    public Map<String, ValueVector> fieldVectorMap() {
+      return fieldVectorMap;
+    }
 
     @SuppressWarnings("resource")
     @Override
@@ -396,6 +422,10 @@ public class ScanBatch implements CloseableRecordBatch {
     public CallBack getCallBack() {
       return callBack;
     }
+
+    public void clear() {
+      fieldVectorMap.clear();
+    }
   }
 
   @Override
@@ -414,7 +444,7 @@ public class ScanBatch implements CloseableRecordBatch {
     for (final ValueVector v : implicitVectors.values()) {
       v.clear();
     }
-    fieldVectorMap.clear();
+    mutator.clear();
     currentReader.close();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index 288e78d..0dc2925 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.OperatorUtilities;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -218,7 +219,7 @@ public class TestAllocators extends DrillTest {
 
       // Use some bogus operator type to create a new operator context.
       def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
-          OperatorContext.getChildCount(physicalOperator1));
+          OperatorUtilities.getChildCount(physicalOperator1));
       stats = fragmentContext1.getStats().newOperatorStats(def, fragmentContext1.getAllocator());
 
       // Add a couple of Operator Contexts
@@ -232,7 +233,7 @@ public class TestAllocators extends DrillTest {
       OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3);
 
       def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE,
-          OperatorContext.getChildCount(physicalOperator4));
+          OperatorUtilities.getChildCount(physicalOperator4));
       stats = fragmentContext2.getStats().newOperatorStats(def, fragmentContext2.getAllocator());
       OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats);
       DrillBuf b22 = oContext22.getAllocator().buffer(2000000);
@@ -246,7 +247,7 @@ public class TestAllocators extends DrillTest {
 
       // New fragment starts an operator that allocates an amount within the limit
       def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE,
-          OperatorContext.getChildCount(physicalOperator5));
+          OperatorUtilities.getChildCount(physicalOperator5));
       stats = fragmentContext3.getStats().newOperatorStats(def, fragmentContext3.getAllocator());
       OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
index c2429b7..847caa5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.OperatorUtilities;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -73,7 +74,7 @@ public class TestRecordIterator extends PopUnitTestBase {
     RecordBatch singleBatch = exec.getIncoming();
     PhysicalOperator dummyPop = operatorList.iterator().next();
     OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
-      OperatorContext.getChildCount(dummyPop));
+      OperatorUtilities.getChildCount(dummyPop));
     OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator());
     RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, false);
     int totalRecords = 0;
@@ -130,7 +131,7 @@ public class TestRecordIterator extends PopUnitTestBase {
     RecordBatch singleBatch = exec.getIncoming();
     PhysicalOperator dummyPop = operatorList.iterator().next();
     OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
-      OperatorContext.getChildCount(dummyPop));
+        OperatorUtilities.getChildCount(dummyPop));
     OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator());
     RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0);
     List<ValueVector> vectors = null;


[4/5] drill git commit: DRILL-4039: Query fails when non-ascii characters are used in string literals

Posted by ar...@apache.org.
DRILL-4039: Query fails when non-ascii characters are used in string literals

closes #825


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0939485d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0939485d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0939485d

Branch: refs/heads/master
Commit: 0939485db40c09525ef3efc7db21cc72f4dd327d
Parents: f1baec3
Author: liyun Liu <ll...@hotmail.com>
Authored: Thu May 4 12:46:58 2017 +0800
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri May 5 15:46:01 2017 +0300

----------------------------------------------------------------------
 .../org/apache/drill/exec/store/hive/schema/DrillHiveTable.java | 5 +++--
 .../apache/drill/exec/planner/sql/parser/DrillParserUtil.java   | 3 ++-
 2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0939485d/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
index af02c0a..1e65ee0 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.hive.schema;
 
 import java.nio.charset.Charset;
+import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -117,7 +118,7 @@ public class DrillHiveTable extends DrillTable{
         int maxLen = TypeInfoUtils.getCharacterLengthForType(pTypeInfo);
         return typeFactory.createTypeWithCharsetAndCollation(
           typeFactory.createSqlType(SqlTypeName.VARCHAR, maxLen), /*input type*/
-          Charset.forName("ISO-8859-1"), /*unicode char set*/
+          Charset.forName(DrillParserUtil.CHARSET),
           SqlCollation.IMPLICIT /* TODO: need to decide if implicit is the correct one */
         );
       }
@@ -126,7 +127,7 @@ public class DrillHiveTable extends DrillTable{
         int maxLen = TypeInfoUtils.getCharacterLengthForType(pTypeInfo);
         return typeFactory.createTypeWithCharsetAndCollation(
             typeFactory.createSqlType(SqlTypeName.CHAR, maxLen), /*input type*/
-            Charset.forName("ISO-8859-1"), /*unicode char set*/
+            Charset.forName(DrillParserUtil.CHARSET),
             SqlCollation.IMPLICIT
         );
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/0939485d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillParserUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillParserUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillParserUtil.java
index b6eb31a..39656e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillParserUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillParserUtil.java
@@ -24,6 +24,7 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.parser.SqlParserUtil;
+import org.apache.calcite.util.Util;
 
 import com.google.common.collect.Lists;
 
@@ -32,7 +33,7 @@ import com.google.common.collect.Lists;
  */
 public class DrillParserUtil {
 
-  public static final String CHARSET = "ISO-8859-1";
+  public static final String CHARSET = Util.getDefaultCharset().name();
 
   public static SqlNode createCondition(SqlNode left, SqlOperator op, SqlNode right) {
 


[2/5] drill git commit: DRILL-5385: Vector serializer fails to read saved SV2

Posted by ar...@apache.org.
DRILL-5385: Vector serializer fails to read saved SV2

Unit testing revealed that the VectorAccessorSerializable class claims
to serialize SV2s, but, in fact, does not. Actually, it writes them,
but does not read them, resulting in corrupted data on read.

Fortunately, no code appears to serialize sv2s at present. Still, it is
a bug and needs to be fixed.

First task is to add serialization code for the sv2.

That revealed that the recently-added code to save DrillBufs using a
shared buffer had a bug: it relied on the writer index to know how much
data is in the buffer. Turns out sv2 buffers don’t set this index. So,
new versions of the write function takes a write length.

Then, closer inspection of the read code revealed duplicated code. So,
DrillBuf allocation moved into a version of the read function that now
does reading and DrillBuf allocation.

Turns out that value vectors, but not SV2s, can be built from a
Drillbuf. Added a matching constructor to the SV2 class.

Finally, cleaned up the code a bit to make it easier to follow. Also
allowed test code to access the handy timer already present in the code.

closes #800


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/35bccd0e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/35bccd0e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/35bccd0e

Branch: refs/heads/master
Commit: 35bccd0e6dcfdde6f9b4ae92734c82a0bd4242ec
Parents: e1bc44c
Author: Paul Rogers <pr...@maprtech.com>
Authored: Sat Mar 25 19:51:43 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri May 5 15:43:13 2017 +0300

----------------------------------------------------------------------
 .../drill/exec/cache/CachedVectorContainer.java |  3 -
 .../cache/VectorAccessibleSerializable.java     | 97 ++++++++++----------
 .../exec/record/selection/SelectionVector2.java |  6 ++
 .../apache/drill/exec/memory/BaseAllocator.java | 56 ++++++++---
 .../drill/exec/memory/BufferAllocator.java      | 36 +++++++-
 5 files changed, 127 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/35bccd0e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
index ff6c14b..99d08e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
@@ -61,10 +61,8 @@ public class CachedVectorContainer extends LoopedAbstractDrillSerializable {
     } catch (IOException e) {
       throw new IllegalStateException(e);
     }
-
   }
 
-
   @Override
   public void read(DataInput input) throws IOException {
     int len = input.readInt();
@@ -95,5 +93,4 @@ public class CachedVectorContainer extends LoopedAbstractDrillSerializable {
   public byte[] getData() {
     return data;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/35bccd0e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 89876af..9d0182f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,6 +28,7 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
@@ -42,11 +43,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
- * A wrapper around a VectorAccessible. Will serialize a VectorAccessible and write to an OutputStream, or can read
- * from an InputStream and construct a new VectorContainer.
+ * A wrapper around a VectorAccessible. Will serialize a VectorAccessible and
+ * write to an OutputStream, or can read from an InputStream and construct a new
+ * VectorContainer.
  */
 public class VectorAccessibleSerializable extends AbstractStreamSerializable {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
   static final MetricRegistry metrics = DrillMetrics.getRegistry();
   static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
 
@@ -56,6 +57,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
   private int recordCount = -1;
   private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
   private SelectionVector2 sv2;
+  private long timeNs;
 
   private boolean retain = false;
 
@@ -69,8 +71,9 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
   }
 
   /**
-   * Creates a wrapper around batch and sv2 for writing to a stream. sv2 will never be released by this class, and ownership
-   * is maintained by caller.
+   * Creates a wrapper around batch and sv2 for writing to a stream. sv2 will
+   * never be released by this class, and ownership is maintained by caller.
+   *
    * @param batch
    * @param sv2
    * @param allocator
@@ -85,40 +88,48 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
   }
 
   /**
-   * Reads from an InputStream and parses a RecordBatchDef. From this, we construct a SelectionVector2 if it exits
-   * and construct the vectors and add them to a vector container
-   * @param input the InputStream to read from
+   * Reads from an InputStream and parses a RecordBatchDef. From this, we
+   * construct a SelectionVector2 if it exits and construct the vectors and add
+   * them to a vector container
+   *
+   * @param input
+   *          the InputStream to read from
    * @throws IOException
    */
-  @SuppressWarnings("resource")
   @Override
   public void readFromStream(InputStream input) throws IOException {
-    final VectorContainer container = new VectorContainer();
     final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
     recordCount = batchDef.getRecordCount();
     if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) {
+      readSv2(input);
+    }
+    readVectors(input, batchDef);
+  }
 
-      if (sv2 == null) {
-        sv2 = new SelectionVector2(allocator);
-      }
-      sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE);
-      sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE);
-      svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+  private void readSv2(InputStream input) throws IOException {
+    if (sv2 != null) {
+      sv2.clear();
     }
+    final int dataLength = recordCount * SelectionVector2.RECORD_SIZE;
+    svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+    @SuppressWarnings("resource")
+    DrillBuf buf = allocator.read(dataLength, input);
+    sv2 = new SelectionVector2(allocator, buf, recordCount);
+    buf.release(); // SV2 now owns the buffer
+  }
+
+  @SuppressWarnings("resource")
+  private void readVectors(InputStream input, RecordBatchDef batchDef) throws IOException {
+    final VectorContainer container = new VectorContainer();
     final List<ValueVector> vectorList = Lists.newArrayList();
     final List<SerializedField> fieldList = batchDef.getFieldList();
     for (SerializedField metaData : fieldList) {
       final int dataLength = metaData.getBufferLength();
       final MaterializedField field = MaterializedField.create(metaData);
-      final DrillBuf buf = allocator.buffer(dataLength);
-      final ValueVector vector;
-      try {
-        allocator.read(buf, input, dataLength);
-        vector = TypeHelper.getNewVector(field, allocator);
-        vector.load(metaData, buf);
-      } finally {
-        buf.release();
-      }
+      final DrillBuf buf = allocator.read(dataLength, input);
+      final ValueVector vector = TypeHelper.getNewVector(field, allocator);
+      vector.load(metaData, buf);
+      buf.release(); // Vector now owns the buffer
       vectorList.add(vector);
     }
     container.addCollection(vectorList);
@@ -146,36 +157,24 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
     final DrillBuf[] incomingBuffers = batch.getBuffers();
     final UserBitShared.RecordBatchDef batchDef = batch.getDef();
 
-    /* DrillBuf associated with the selection vector */
-    DrillBuf svBuf = null;
-    Integer svCount =  null;
-
-    if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE) {
-      svCount = sv2.getCount();
-      svBuf = sv2.getBuffer(); //this calls retain() internally
-    }
-
     try {
       /* Write the metadata to the file */
       batchDef.writeDelimitedTo(output);
 
       /* If we have a selection vector, dump it to file first */
-      if (svBuf != null) {
-        allocator.write(svBuf, output);
-        sv2.setBuffer(svBuf);
-        svBuf.release(); // sv2 now owns the buffer
-        sv2.setRecordCount(svCount);
+      if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE) {
+        recordCount = sv2.getCount();
+        final int dataLength = recordCount * SelectionVector2.RECORD_SIZE;
+        allocator.write(sv2.getBuffer(false), dataLength, output);
       }
 
       /* Dump the array of ByteBuf's associated with the value vectors */
       for (DrillBuf buf : incomingBuffers) {
-                /* dump the buffer into the OutputStream */
+        /* dump the buffer into the OutputStream */
         allocator.write(buf, output);
       }
 
-      output.flush();
-
-      timerContext.stop();
+      timeNs += timerContext.stop();
     } catch (IOException e) {
       throw new RuntimeException(e);
     } finally {
@@ -192,11 +191,9 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
     }
   }
 
-  public VectorContainer get() {
-    return va;
-  }
+  public VectorContainer get() { return va; }
 
-  public SelectionVector2 getSv2() {
-    return sv2;
-  }
+  public SelectionVector2 getSv2() { return sv2; }
+
+  public long getTimeNs() { return timeNs; }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/35bccd0e/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 1a31625..a38a7fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -39,6 +39,12 @@ public class SelectionVector2 implements AutoCloseable {
     this.allocator = allocator;
   }
 
+  public SelectionVector2(BufferAllocator allocator, DrillBuf buf, int count) {
+    this.allocator = allocator;
+    buffer = buf;
+    recordCount = count;
+  }
+
   public int getCount() {
     return recordCount;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/35bccd0e/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index ba47998..d872d67 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -68,6 +68,17 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
   private final IdentityHashMap<Reservation, Object> reservations;
   private final HistoricalLog historicalLog;
 
+  /**
+   * Disk I/O buffer used for all reads and writes of DrillBufs.
+   * The buffer is allocated when first needed, then reused by all
+   * subsequent I/O operations for the same operator. Since very few
+   * operators do I/O, the number of allocated buffers should be
+   * low. Better would be to hold the buffer at the fragment level
+   * since all operators within a fragment run within a single thread.
+   */
+
+  private byte ioBuffer[];
+
   protected BaseAllocator(
       final BaseAllocator parentAllocator,
       final String name,
@@ -350,6 +361,9 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
         return;
       }
 
+      if (ioBuffer != null) {
+        ioBuffer = null;
+      }
       if (DEBUG) {
         if (!isClosed()) {
           final Object object;
@@ -513,12 +527,8 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
 
     if (DEBUG) {
       historicalLog.recordEvent("closed");
-      logger.debug(String.format(
-          "closed allocator[%s].",
-          name));
+      logger.debug(String.format("closed allocator[%s].", name));
     }
-
-
   }
 
   @Override
@@ -793,20 +803,20 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
     return DEBUG;
   }
 
-  /**
-   * Disk I/O buffer used for all reads and writes of DrillBufs.
-   */
-
-  private byte ioBuffer[];
-
   public byte[] getIOBuffer() {
     if (ioBuffer == null) {
+      // Length chosen to the smallest size that maximizes
+      // disk I/O performance. Smaller sizes slow I/O. Larger
+      // sizes provide no increase in performance.
+      // Revisit from time to time.
+
       ioBuffer = new byte[32*1024];
     }
     return ioBuffer;
   }
 
-  public void read(DrillBuf buf, InputStream in, int length) throws IOException {
+  @Override
+  public void read(DrillBuf buf, int length, InputStream in) throws IOException {
     buf.clear();
 
     byte[] buffer = getIOBuffer();
@@ -817,11 +827,27 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
     }
   }
 
+  public DrillBuf read(int length, InputStream in) throws IOException {
+    DrillBuf buf = buffer(length);
+    try {
+      read(buf, length, in);
+      return buf;
+    } catch (IOException e) {
+      buf.release();
+      throw e;
+    }
+  }
+
+  @Override
   public void write(DrillBuf buf, OutputStream out) throws IOException {
+    write(buf, buf.readableBytes(), out);
+  }
+
+  @Override
+  public void write(DrillBuf buf, int length, OutputStream out) throws IOException {
     byte[] buffer = getIOBuffer();
-    int bufLength = buf.readableBytes();
-    for (int posn = 0; posn < bufLength; posn += buffer.length) {
-      int len = Math.min(buffer.length, bufLength - posn);
+    for (int posn = 0; posn < length; posn += buffer.length) {
+      int len = Math.min(buffer.length, length - posn);
       buf.getBytes(posn, buffer, 0, len);
       out.write(buffer, 0, len);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/35bccd0e/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index 3c5f57d..bdf3073 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -160,6 +160,9 @@ public interface BufferAllocator extends AutoCloseable {
    * Write the contents of a DrillBuf to a stream. Use this method, rather
    * than calling the DrillBuf.getBytes() method, because this method
    * avoids repeated heap allocation for the intermediate heap buffer.
+   * Uses the reader and writer indexes to determine
+   * the number of bytes to write. Useful only for bufs created using
+   * those indexes.
    *
    * @param buf the Drillbuf to write
    * @param output the output stream
@@ -169,15 +172,42 @@ public interface BufferAllocator extends AutoCloseable {
   public void write(DrillBuf buf, OutputStream out) throws IOException;
 
   /**
+   * Write the contents of a DrillBuf to a stream. Use this method, rather
+   * than calling the DrillBuf.getBytes() method, because this method
+   * avoids repeated heap allocation for the intermediate heap buffer.
+   * Writes the specified number of bytes starting from the head of the
+   * given Drillbuf.
+   *
+   * @param buf the Drillbuf to write
+   * @param length the number of bytes to read. Must be less than or
+   * equal to number of bytes allocated in the buffer.
+   * @param out the output stream
+   * @throws IOException if a write error occurs
+   */
+
+  public void write(DrillBuf buf, int length, OutputStream out) throws IOException;
+
+  /**
    * Read the contents of a DrillBuf from a stream. Use this method, rather
    * than calling the DrillBuf.writeBytes() method, because this method
    * avoids repeated heap allocation for the intermediate heap buffer.
+   * The buffer must have already been allocated.
    *
    * @param buf the buffer to read with space already allocated
-   * @param input input stream from which to read data
-   * @param bufLength number of bytes to read
+   * @param length number of bytes to read
+   * @param in input stream from which to read data
+   * @throws IOException if a read error occurs
+   */
+
+  public void read(DrillBuf buf, int length, InputStream in) throws IOException;
+
+  /**
+   * Reads the specified number of bytes into a new Drillbuf.
+   * @param length number of bytes to read
+   * @param in input stream from which to read data
+   * @return the buffer holding the data read from the stream
    * @throws IOException if a read error occurs
    */
 
-  public void read(DrillBuf buf, InputStream in, int length) throws IOException;
+  public DrillBuf read(int length, InputStream in) throws IOException;
 }