You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by su...@apache.org on 2017/04/21 23:32:28 UTC

[1/3] drill git commit: DRILL-5318: Sub-operator test fixture

Repository: drill
Updated Branches:
  refs/heads/master 381eab668 -> 3e8b01d5b


DRILL-5318: Sub-operator test fixture

This commit depends on:

* DRILL-5323

This PR cannot be accepted (or built) until the above are pulled and
this PR is rebased on top of them. The PR is issued now so that reviews
can be done in parallel.

Provides the following:

* A new OperatorFixture to set up all the objects needed to test at the
sub-operator level. This relies on the refactoring to create the
required interfaces.
* Pulls the config builder code out of the cluster fixture builder so
that configs can be build for sub-operator tests.
* Modifies the QueryBuilder test tool to run a query and get back one
of the new row set objects to allow direct inspection of data returned
from a query.
* Modifies the cluster fixture to create a JDBC connection to the test
cluster. (Use requires putting the Drill JDBC project on the test class
path since exec does not depend on JDBC.)

Created a common subclass for the cluster and operator fixtures to
abstract out the allocator and config. Also provides temp directory
support to the operator fixture.

Merged with DRILL-5415 (Improve Fixture Builder to configure client
properties)

Moved row set tests here from DRILL-5323 so that DRILL-5323 is self
contained. (The tests depend on the fixtures defined here.)

Added comments where needed.

Puts code back as it was prior to a code review comment. The code is
redundant, but necessarily so due to code which is specific to several
primitive types.

closes #788


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3e8b01d5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3e8b01d5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3e8b01d5

Branch: refs/heads/master
Commit: 3e8b01d5b0d3013e3811913f0fd6028b22c1ac3f
Parents: 095a660
Author: Paul Rogers <pr...@maprtech.com>
Authored: Tue Mar 14 16:18:24 2017 -0700
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Fri Apr 21 14:51:36 2017 -0700

----------------------------------------------------------------------
 .../drill/exec/memory/RootAllocatorFactory.java |  11 +-
 .../apache/drill/exec/record/BatchSchema.java   |   2 +-
 .../drill/exec/record/RecordBatchLoader.java    |  15 +-
 .../exec/record/selection/SelectionVector2.java |  21 +-
 .../java/org/apache/drill/test/BaseFixture.java |  61 +++
 .../org/apache/drill/test/ClientFixture.java    |  45 ++-
 .../org/apache/drill/test/ClusterFixture.java   | 114 +++---
 .../org/apache/drill/test/ConfigBuilder.java    | 153 +++++++
 .../org/apache/drill/test/FixtureBuilder.java   |  31 +-
 .../org/apache/drill/test/OperatorFixture.java  | 331 +++++++++++++++
 .../org/apache/drill/test/QueryBuilder.java     | 118 ++++++
 .../drill/test/rowSet/HyperRowSetImpl.java      |   3 +
 .../apache/drill/test/rowSet/RowSetBuilder.java |   1 +
 .../drill/test/rowSet/test/RowSetTest.java      | 400 +++++++++++++++++++
 .../drill/test/rowSet/test/package-info.java    |  21 +
 15 files changed, 1205 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
index 4fad668..5b42b3d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,6 +19,8 @@ package org.apache.drill.exec.memory;
 
 import org.apache.drill.common.config.DrillConfig;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class RootAllocatorFactory {
 
   public static final String TOP_LEVEL_MAX_ALLOC = "drill.memory.top.max";
@@ -35,6 +37,11 @@ public class RootAllocatorFactory {
    * @return a new root allocator
    */
   public static BufferAllocator newRoot(final DrillConfig drillConfig) {
-    return new RootAllocator(Math.min(DrillConfig.getMaxDirectMemory(), drillConfig.getLong(TOP_LEVEL_MAX_ALLOC)));
+    return newRoot(drillConfig.getLong(TOP_LEVEL_MAX_ALLOC));
+  }
+
+  @VisibleForTesting
+  public static BufferAllocator newRoot(long maxAlloc) {
+    return new RootAllocator(Math.min(DrillConfig.getMaxDirectMemory(), maxAlloc));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 168995d..e9dcd28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index ea99fcb..3801cb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -70,6 +70,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
    * @throws SchemaChangeException
    *   TODO:  Clean:  DRILL-2933  load(...) never actually throws SchemaChangeException.
    */
+  @SuppressWarnings("resource")
   public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeException {
     if (logger.isTraceEnabled()) {
       logger.trace("Loading record batch with def {} and data {}", def, buf);
@@ -169,9 +170,9 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
 //  }
 
   @Override
-  public int getRecordCount() {
-    return valueCount;
-  }
+  public int getRecordCount() { return valueCount; }
+
+  public VectorContainer getContainer() { return container; }
 
   @Override
   public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids){
@@ -199,13 +200,9 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
   }
 
   @Override
-  public BatchSchema getSchema() {
-    return schema;
-  }
+  public BatchSchema getSchema() { return schema; }
 
-  public void resetRecordCount() {
-    valueCount = 0;
-  }
+  public void resetRecordCount() { valueCount = 0; }
 
   /**
    * Clears this loader, which clears the internal vector container (see

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index dcf9a7d..1a31625 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -133,4 +133,23 @@ public class SelectionVector2 implements AutoCloseable {
   public void close() {
     clear();
   }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("[SV2: recs=");
+    buf.append(recordCount);
+    buf.append(" - ");
+    int n = Math.min(20, recordCount);
+    for (int i = 0; i < n; i++) {
+      if (i > 0) { buf.append("," ); }
+      buf.append((int) getIndex(i));
+    }
+    if (recordCount > n) {
+      buf.append("...");
+      buf.append((int) getIndex(recordCount-1));
+    }
+    buf.append("]");
+    return buf.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/test/java/org/apache/drill/test/BaseFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseFixture.java
new file mode 100644
index 0000000..02cdbef
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseFixture.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+import com.google.common.io.Files;
+
+/**
+ * Base class for "fixtures." Provides the basics such as the Drill
+ * configuration, a memory allocator and so on.
+ */
+
+public class BaseFixture {
+
+  protected DrillConfig config;
+  protected BufferAllocator allocator;
+
+  /**
+   * Create a temp directory to store the given <i>dirName</i>. Directory will
+   * be deleted on exit. Directory is created if it does not exist.
+   *
+   * @param dirName directory name
+   * @return Full path including temp parent directory and given directory name.
+   */
+
+  public static File getTempDir(final String dirName) {
+    final File dir = Files.createTempDir();
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        FileUtils.deleteQuietly(dir);
+      }
+    });
+    File tempDir = new File(dir, dirName);
+    tempDir.mkdirs();
+    return tempDir;
+  }
+
+  public BufferAllocator allocator() { return allocator; }
+  public DrillConfig config() { return config; }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
index 25dab4f..a63a287 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -34,6 +34,13 @@ import org.apache.drill.exec.testing.ControlsInjectionUtil;
 import org.apache.drill.test.ClusterFixture.FixtureTestServices;
 import org.apache.drill.test.QueryBuilder.QuerySummary;
 
+/**
+ * Represents a Drill client. Provides many useful test-specific operations such
+ * as setting system options, running queries, and using the @{link TestBuilder}
+ * class.
+ * @see ExampleTest ExampleTest for usage examples
+ */
+
 public class ClientFixture implements AutoCloseable {
 
   public static class ClientBuilder {
@@ -45,21 +52,23 @@ public class ClientFixture implements AutoCloseable {
       this.cluster = cluster;
       clientProps = cluster.getClientProps();
     }
+
     /**
      * Specify an optional client property.
      * @param key property name
      * @param value property value
      * @return this builder
      */
-    public ClientBuilder property( String key, Object value ) {
-      if ( clientProps == null ) {
-        clientProps = new Properties( );
+
+    public ClientBuilder property(String key, Object value) {
+      if (clientProps == null) {
+        clientProps = new Properties();
       }
       clientProps.put(key, value);
       return this;
     }
 
-    public ClientFixture build( ) {
+    public ClientFixture build() {
       try {
         return new ClientFixture(this);
       } catch (RpcException e) {
@@ -81,17 +90,17 @@ public class ClientFixture implements AutoCloseable {
     // Create a client.
 
     if (cluster.usesZK()) {
-      client = new DrillClient(cluster.config( ));
+      client = new DrillClient(cluster.config());
     } else {
-      client = new DrillClient(cluster.config( ), cluster.serviceSet( ).getCoordinator());
+      client = new DrillClient(cluster.config(), cluster.serviceSet().getCoordinator());
     }
     client.connect(builder.clientProps);
     cluster.clients.add(this);
   }
 
   public DrillClient client() { return client; }
-  public ClusterFixture cluster( ) { return cluster; }
-  public BufferAllocator allocator( ) { return cluster.allocator( ); }
+  public ClusterFixture cluster() { return cluster; }
+  public BufferAllocator allocator() { return client.getAllocator(); }
 
   /**
    * Set a runtime option.
@@ -101,14 +110,14 @@ public class ClientFixture implements AutoCloseable {
    * @throws RpcException
    */
 
-  public void alterSession(String key, Object value ) {
-    String sql = "ALTER SESSION SET `" + key + "` = " + ClusterFixture.stringify( value );
-    runSqlSilently( sql );
+  public void alterSession(String key, Object value) {
+    String sql = "ALTER SESSION SET `" + key + "` = " + ClusterFixture.stringify(value);
+    runSqlSilently(sql);
   }
 
-  public void alterSystem(String key, Object value ) {
-    String sql = "ALTER SYSTEM SET `" + key + "` = " + ClusterFixture.stringify( value );
-    runSqlSilently( sql );
+  public void alterSystem(String key, Object value) {
+    String sql = "ALTER SYSTEM SET `" + key + "` = " + ClusterFixture.stringify(value);
+    runSqlSilently(sql);
   }
 
   /**
@@ -160,17 +169,17 @@ public class ClientFixture implements AutoCloseable {
       if (trimmedQuery.isEmpty()) {
         continue;
       }
-      queryBuilder( ).sql(trimmedQuery).print();
+      queryBuilder().sql(trimmedQuery).print();
     }
   }
 
   @Override
-  public void close( ) {
+  public void close() {
     if (client == null) {
       return;
     }
     try {
-      client.close( );
+      client.close();
     } finally {
       client = null;
       cluster.clients.remove(this);
@@ -198,7 +207,7 @@ public class ClientFixture implements AutoCloseable {
    */
 
   public ProfileParser parseProfile(String queryId) throws IOException {
-    File file = new File(cluster.getProfileDir(), queryId + ".sys.drill" );
+    File file = new File(cluster.getProfileDir(), queryId + ".sys.drill");
     return new ProfileParser(file);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 90ce206..0ce337d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -20,27 +20,28 @@ package org.apache.drill.test;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.DrillTestWrapper.TestServices;
 import org.apache.drill.QueryTestUtil;
-import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.TestBuilder;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.server.Drillbit;
@@ -57,7 +58,6 @@ import org.apache.drill.exec.util.TestUtilities;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
-import com.google.common.io.Files;
 import com.google.common.io.Resources;
 
 /**
@@ -67,7 +67,7 @@ import com.google.common.io.Resources;
  * creates the requested Drillbit and client.
  */
 
-public class ClusterFixture implements AutoCloseable {
+public class ClusterFixture extends BaseFixture implements AutoCloseable {
   // private static final org.slf4j.Logger logger =
   // org.slf4j.LoggerFactory.getLogger(ClientFixture.class);
   public static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache";
@@ -123,10 +123,8 @@ public class ClusterFixture implements AutoCloseable {
 
   public static final String DEFAULT_BIT_NAME = "drillbit";
 
-  private DrillConfig config;
   private Map<String, Drillbit> bits = new HashMap<>();
   private Drillbit defaultDrillbit;
-  private BufferAllocator allocator;
   private boolean ownsZK;
   private ZookeeperHelper zkHelper;
   private RemoteServiceSet serviceSet;
@@ -199,7 +197,7 @@ public class ClusterFixture implements AutoCloseable {
       // combining locally-set properties and a config file: it is one
       // or the other.
 
-      if (builder.configProps == null) {
+      if (builder.configBuilder().hasResource()) {
         throw new IllegalArgumentException("Cannot specify a local ZK while using an external config file.");
       }
       builder.configProperty(ExecConstants.ZK_CONNECTION, zkConnect);
@@ -216,13 +214,7 @@ public class ClusterFixture implements AutoCloseable {
     // Because of the way DrillConfig works, we can set the ZK
     // connection string only if a property set is provided.
 
-    if (builder.configResource != null) {
-      config = DrillConfig.create(builder.configResource);
-    } else if (builder.configProps != null) {
-      config = configProperties(builder.configProps);
-    } else {
-      throw new IllegalStateException("Configuration was not provided.");
-    }
+    config = builder.configBuilder.build();
 
     if (builder.usingZk) {
       // Distribute drillbit using ZK (in-process or external)
@@ -341,39 +333,10 @@ public class ClusterFixture implements AutoCloseable {
     }
   }
 
-  private DrillConfig configProperties(Properties configProps) {
-    Properties stringProps = new Properties();
-    Properties collectionProps = new Properties();
-
-    // Filter out the collection type configs and other configs which can be converted to string.
-    for(Entry<Object, Object> entry : configProps.entrySet()) {
-      if(entry.getValue() instanceof Collection<?>) {
-        collectionProps.put(entry.getKey(), entry.getValue());
-      } else {
-        stringProps.setProperty(entry.getKey().toString(), entry.getValue().toString());
-      }
-    }
-
-    // First create a DrillConfig based on string properties.
-    Config drillConfig = DrillConfig.create(stringProps);
-
-    // Then add the collection properties inside the DrillConfig. Below call to withValue returns
-    // a new reference. Considering mostly properties will be of string type, doing this
-    // later will be less expensive as compared to doing it for all the properties.
-    for(Entry<Object, Object> entry : collectionProps.entrySet()) {
-      drillConfig = drillConfig.withValue(entry.getKey().toString(),
-        ConfigValueFactory.fromAnyRef(entry.getValue()));
-    }
-
-    return new DrillConfig(drillConfig, true);
-  }
-
   public Drillbit drillbit() { return defaultDrillbit; }
   public Drillbit drillbit(String name) { return bits.get(name); }
   public Collection<Drillbit> drillbits() { return bits.values(); }
   public RemoteServiceSet serviceSet() { return serviceSet; }
-  public BufferAllocator allocator() { return allocator; }
-  public DrillConfig config() { return config; }
   public File getDfsTestTmpDir() { return dfsTestTempDir; }
 
   public ClientFixture.ClientBuilder clientBuilder() {
@@ -392,6 +355,39 @@ public class ClusterFixture implements AutoCloseable {
   }
 
   /**
+   * Return a JDBC connection to the default (first) Drillbit.
+   * Note that this code requires special setup of the test code.
+   * Tests in the "exec" package do not normally have visibility
+   * to the Drill JDBC driver. So, the test must put that code
+   * on the class path manually in order for this code to load the
+   * JDBC classes. The caller is responsible for closing the JDBC
+   * connection before closing the cluster. (An enhancement is to
+   * do the close automatically as is done for clients.)
+   *
+   * @return a JDBC connection to the default Drillbit
+   */
+
+  public Connection jdbcConnection() {
+    try {
+      Class.forName("org.apache.drill.jdbc.Driver");
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    }
+    String connStr = "jdbc:drill:";
+    if (usesZK()) {
+      connStr += "zk=" + zkHelper.getConnectionString();
+    } else {
+      DrillbitEndpoint ep = drillbit().getContext().getEndpoint();
+      connStr += "drillbit=" + ep.getAddress() + ":" + ep.getUserPort();
+    }
+    try {
+      return DriverManager.getConnection(connStr);
+    } catch (SQLException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /**
    * Close the clients, Drillbits, allocator and
    * Zookeeper. Checks for exceptions. If an exception occurs,
    * continues closing, suppresses subsequent exceptions, and
@@ -562,10 +558,13 @@ public class ClusterFixture implements AutoCloseable {
   public static final String EXPLAIN_PLAN_JSON = "json";
 
   public static FixtureBuilder builder() {
-     return new FixtureBuilder()
-         .configProps(FixtureBuilder.defaultProps())
+    FixtureBuilder builder = new FixtureBuilder()
          .sessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, MAX_WIDTH_PER_NODE)
          ;
+    Properties props = new Properties();
+    props.putAll(ClusterFixture.TEST_CONFIGURATIONS);
+    builder.configBuilder.configProps(props);
+    return builder;
   }
 
   /**
@@ -689,27 +688,6 @@ public class ClusterFixture implements AutoCloseable {
   }
 
   /**
-   * Create a temp directory to store the given <i>dirName</i>. Directory will
-   * be deleted on exit. Directory is created if it does not exist.
-   *
-   * @param dirName directory name
-   * @return Full path including temp parent directory and given directory name.
-   */
-
-  public static File getTempDir(final String dirName) {
-    final File dir = Files.createTempDir();
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        FileUtils.deleteQuietly(dir);
-      }
-    });
-    File tempDir = new File(dir, dirName);
-    tempDir.mkdirs();
-    return tempDir;
-  }
-
-  /**
    * Create a temporary directory which will be removed when the
    * cluster closes.
    *

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/test/java/org/apache/drill/test/ConfigBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ConfigBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/ConfigBuilder.java
new file mode 100644
index 0000000..82f6196
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ConfigBuilder.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.test;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.config.DrillConfig;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+/**
+ * Builds a {@link DrillConfig} for use in tests. Use this when a config
+ * is needed by itself, separate from an embedded Drillbit.
+ */
+public class ConfigBuilder {
+
+  protected String configResource;
+  protected Properties configProps;
+
+  /**
+   * Use the given configuration properties as overrides.
+   * @param configProps a collection of config properties
+   * @return this builder
+   * @see {@link #configProperty(String, Object)}
+   */
+
+  public ConfigBuilder configProps(Properties configProps) {
+    if (hasResource()) {
+      // Drill provides no constructor for this use case.
+      throw new IllegalArgumentException( "Cannot provide both a config resource and config properties.");
+    }
+    if (this.configProps == null) {
+      this.configProps = configProps;
+    } else {
+      this.configProps.putAll(configProps);
+    }
+    return this;
+  }
+
+  /**
+   * Use the given configuration file, stored as a resource, to initialize
+   * the Drill config. Note that the resource file should have the two
+   * following settings to work as a config for an embedded Drillbit:
+   * <pre><code>
+   * drill.exec.sys.store.provider.local.write : false,
+   * drill.exec.http.enabled : false
+   * </code></pre>
+   * It may be more convenient to add your settings to the default
+   * config settings with {@link #configProperty(String, Object)}.
+   * @param configResource path to the file that contains the
+   * config file to be read
+   * @return this builder
+   * @see {@link #configProperty(String, Object)}
+   */
+
+  public ConfigBuilder resource(String configResource) {
+
+    if (configProps != null) {
+      // Drill provides no constructor for this use case.
+      throw new IllegalArgumentException( "Cannot provide both a config resource and config properties.");
+    }
+
+    // TypeSafe gets unhappy about a leading slash, but other functions
+    // require it. Silently discard the leading slash if given to
+    // preserve the test writer's sanity.
+
+    this.configResource = ClusterFixture.trimSlash(configResource);
+    return this;
+  }
+
+  /**
+   * Add an additional boot-time property for the embedded Drillbit.
+   * @param key config property name
+   * @param value property value
+   * @return this builder
+   */
+
+  public ConfigBuilder put(String key, Object value) {
+    if (hasResource()) {
+      // Drill provides no constructor for this use case.
+      throw new IllegalArgumentException( "Cannot provide both a config resource and config properties.");
+    }
+    if (configProps == null) {
+      configProps = new Properties();
+    }
+    configProps.put(key, value.toString());
+    return this;
+  }
+
+  public DrillConfig build() {
+
+    // Create a config
+    // Because of the way DrillConfig works, we can set the ZK
+    // connection string only if a property set is provided.
+
+    if (hasResource()) {
+      return DrillConfig.create(configResource);
+    } else if (configProps != null) {
+      return constructConfig();
+    } else {
+      return DrillConfig.create();
+    }
+  }
+
+  private DrillConfig constructConfig() {
+    Properties stringProps = new Properties();
+    Properties collectionProps = new Properties();
+
+    // Filter out the collection type configs and other configs which can be converted to string.
+    for(Entry<Object, Object> entry : configProps.entrySet()) {
+      if(entry.getValue() instanceof Collection<?>) {
+        collectionProps.put(entry.getKey(), entry.getValue());
+      } else {
+        stringProps.setProperty(entry.getKey().toString(), entry.getValue().toString());
+      }
+    }
+
+    // First create a DrillConfig based on string properties.
+    Config drillConfig = DrillConfig.create(stringProps);
+
+    // Then add the collection properties inside the DrillConfig. Below call to withValue returns
+    // a new reference. Considering mostly properties will be of string type, doing this
+    // later will be less expensive as compared to doing it for all the properties.
+    for(Entry<Object, Object> entry : collectionProps.entrySet()) {
+      drillConfig = drillConfig.withValue(entry.getKey().toString(),
+        ConfigValueFactory.fromAnyRef(entry.getValue()));
+    }
+
+    return new DrillConfig(drillConfig, true);
+  }
+
+  public boolean hasResource() {
+    return configResource != null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
index f6106ff..b305609 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
@@ -19,7 +19,6 @@ package org.apache.drill.test;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 
@@ -51,14 +50,7 @@ public class FixtureBuilder {
   public static final int DEFAULT_SERVER_RPC_THREADS = 10;
   public static final int DEFAULT_SCAN_THREADS = 8;
 
-  public static Properties defaultProps() {
-    Properties props = new Properties();
-    props.putAll(ClusterFixture.TEST_CONFIGURATIONS);
-    return props;
-  }
-
-  protected String configResource;
-  protected Properties configProps;
+  protected ConfigBuilder configBuilder = new ConfigBuilder();
   protected List<RuntimeOption> sessionOptions;
   protected List<RuntimeOption> systemOptions;
   protected int bitCount = 1;
@@ -71,16 +63,12 @@ public class FixtureBuilder {
   protected Properties clientProps;
 
   /**
-   * Use the given configuration properties to start the embedded Drillbit.
-   * @param configProps a collection of config properties
-   * @return this builder
-   * @see {@link #configProperty(String, Object)}
+   * The configuration builder which this fixture builder uses.
+   * @return the configuration builder for use in setting "advanced"
+   * configuration options.
    */
 
-  public FixtureBuilder configProps(Properties configProps) {
-    this.configProps = configProps;
-    return this;
-  }
+  public ConfigBuilder configBuilder() { return configBuilder; }
 
   /**
    * Use the given configuration file, stored as a resource, to start the
@@ -104,7 +92,7 @@ public class FixtureBuilder {
     // require it. Silently discard the leading slash if given to
     // preserve the test writer's sanity.
 
-    this.configResource = ClusterFixture.trimSlash(configResource);
+    configBuilder.resource(ClusterFixture.trimSlash(configResource));
     return this;
   }
 
@@ -116,10 +104,7 @@ public class FixtureBuilder {
    */
 
   public FixtureBuilder configProperty(String key, Object value) {
-    if (configProps == null) {
-      configProps = defaultProps();
-    }
-    configProps.put(key, value);
+    configBuilder.put(key, value.toString());
     return this;
   }
 
@@ -131,7 +116,7 @@ public class FixtureBuilder {
    * @return this builder
    */
   public FixtureBuilder configClientProperty(String key, Object value) {
-    if(clientProps == null) {
+    if (clientProps == null) {
       clientProps = new Properties();
     }
     clientProps.put(key, value.toString());

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
new file mode 100644
index 0000000..2c72c3c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.compile.CodeCompiler;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.ops.FragmentExecContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperExecContextImpl;
+import org.apache.drill.exec.ops.OperatorStatReceiver;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.server.options.BaseOptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.HyperRowSetImpl;
+import org.apache.drill.test.rowSet.IndirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+
+/**
+ * Test fixture for operator and (especially) "sub-operator" tests.
+ * These are tests that are done without the full Drillbit server.
+ * Instead, this fixture creates a test fixture runtime environment
+ * that provides "real" implementations of the classes required by
+ * operator internals, but with implementations tuned to the test
+ * environment. The services available from this fixture are:
+ * <ul>
+ * <li>Configuration (DrillConfig)</li>
+ * <li>Memory allocator</li>
+ * <li>Code generation (compilers, code cache, etc.)</li>
+ * <li>Read-only version of system and session options (which
+ * are set when creating the fixture.</li>
+ * <li>Write-only version of operator stats (which are easy to
+ * read to verify in tests.</li>
+ * </ul>
+ * What is <b>not</b> provided is anything that depends on a live server:
+ * <ul>
+ * <li>Network endpoints.</li>
+ * <li>Persistent storage.</li>
+ * <li>ZK access.</li>
+ * <li>Multiple threads of execution.</li>
+ * </ul>
+ */
+
+public class OperatorFixture extends BaseFixture implements AutoCloseable {
+
+  /**
+   * Builds an operator fixture based on a set of config options and system/session
+   * options.
+   */
+
+  public static class OperatorFixtureBuilder
+  {
+    ConfigBuilder configBuilder = new ConfigBuilder();
+    TestOptionSet options = new TestOptionSet();
+
+    public ConfigBuilder configBuilder() {
+      return configBuilder;
+    }
+
+    public TestOptionSet options() {
+      return options;
+    }
+
+    public OperatorFixture build() {
+      return new OperatorFixture(this);
+    }
+  }
+
+  /**
+   * Test-time implementation of the system and session options. Provides
+   * a simple storage and a simple set interface, then implements the standard
+   * system and session option read interface.
+   */
+
+  public static class TestOptionSet extends BaseOptionManager {
+
+    private Map<String,OptionValue> values = new HashMap<>();
+
+    public TestOptionSet() {
+      // Crashes in FunctionImplementationRegistry if not set
+      set(ExecConstants.CAST_TO_NULLABLE_NUMERIC, false);
+      // Crashes in the Dynamic UDF code if not disabled
+      set(ExecConstants.USE_DYNAMIC_UDFS_KEY, false);
+//      set(ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE_VALIDATOR, false);
+    }
+
+    public void set(String key, int value) {
+      set(key, (long) value);
+    }
+
+    public void set(String key, long value) {
+      values.put(key, OptionValue.createLong(OptionType.SYSTEM, key, value));
+    }
+
+    public void set(String key, boolean value) {
+      values.put(key, OptionValue.createBoolean(OptionType.SYSTEM, key, value));
+    }
+
+    public void set(String key, double value) {
+      values.put(key, OptionValue.createDouble(OptionType.SYSTEM, key, value));
+    }
+
+    public void set(String key, String value) {
+      values.put(key, OptionValue.createString(OptionType.SYSTEM, key, value));
+    }
+
+    @Override
+    public OptionValue getOption(String name) {
+      return values.get(name);
+    }
+  }
+
+  /**
+   * Provide a simplified test-time code generation context that
+   * uses the same code generation mechanism as the full Drill, but
+   * provide test-specific versions of various other services.
+   */
+
+  public static class TestCodeGenContext implements FragmentExecContext {
+
+    private final DrillConfig config;
+    private final OptionSet options;
+    private final CodeCompiler compiler;
+    private final FunctionImplementationRegistry functionRegistry;
+    private ExecutionControls controls;
+
+    public TestCodeGenContext(DrillConfig config, OptionSet options) {
+      this.config = config;
+      this.options = options;
+      ScanResult classpathScan = ClassPathScanner.fromPrescan(config);
+      functionRegistry = new FunctionImplementationRegistry(config, classpathScan, options);
+      compiler = new CodeCompiler(config, options);
+    }
+
+    public void setExecutionControls(ExecutionControls controls) {
+      this.controls = controls;
+    }
+
+    @Override
+    public FunctionImplementationRegistry getFunctionRegistry() {
+      return functionRegistry;
+    }
+
+    @Override
+    public OptionSet getOptionSet() {
+      return options;
+    }
+
+    @Override
+    public <T> T getImplementationClass(final ClassGenerator<T> cg)
+        throws ClassTransformationException, IOException {
+      return getImplementationClass(cg.getCodeGenerator());
+    }
+
+    @Override
+    public <T> T getImplementationClass(final CodeGenerator<T> cg)
+        throws ClassTransformationException, IOException {
+      return compiler.createInstance(cg);
+    }
+
+    @Override
+    public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
+      return getImplementationClass(cg.getCodeGenerator(), instanceCount);
+    }
+
+    @Override
+    public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
+      return compiler.createInstances(cg, instanceCount);
+    }
+
+    @Override
+    public boolean shouldContinue() {
+      return true;
+    }
+
+    @Override
+    public ExecutionControls getExecutionControls() {
+      return controls;
+    }
+
+    @Override
+    public DrillConfig getConfig() {
+      return config;
+    }
+  }
+
+  /**
+   * Implements a write-only version of the stats collector for use by opeators,
+   * then provides simplified test-time accessors to get the stats values when
+   * validating code in tests.
+   */
+
+  public static class MockStats implements OperatorStatReceiver {
+
+    public Map<Integer,Double> stats = new HashMap<>();
+
+    @Override
+    public void addLongStat(MetricDef metric, long value) {
+      setStat(metric, getStat(metric) + value);
+    }
+
+    @Override
+    public void addDoubleStat(MetricDef metric, double value) {
+      setStat(metric, getStat(metric) + value);
+    }
+
+    @Override
+    public void setLongStat(MetricDef metric, long value) {
+      setStat(metric, value);
+    }
+
+    @Override
+    public void setDoubleStat(MetricDef metric, double value) {
+      setStat(metric, value);
+    }
+
+    public double getStat(MetricDef metric) {
+      return getStat(metric.metricId());
+    }
+
+    private double getStat(int metricId) {
+      Double value = stats.get(metricId);
+      return value == null ? 0 : value;
+    }
+
+    private void setStat(MetricDef metric, double value) {
+      setStat(metric.metricId(), value);
+    }
+
+    private void setStat(int metricId, double value) {
+      stats.put(metricId, value);
+    }
+  }
+
+  private final TestOptionSet options;
+  private final TestCodeGenContext context;
+  private final OperatorStatReceiver stats;
+
+  protected OperatorFixture(OperatorFixtureBuilder builder) {
+    config = builder.configBuilder().build();
+    allocator = RootAllocatorFactory.newRoot(config);
+    options = builder.options();
+    context = new TestCodeGenContext(config, options);
+    stats = new MockStats();
+   }
+
+  public TestOptionSet options() { return options; }
+
+
+  public FragmentExecContext codeGenContext() { return context; }
+
+  @Override
+  public void close() throws Exception {
+    allocator.close();
+  }
+
+  public static OperatorFixtureBuilder builder() {
+    OperatorFixtureBuilder builder = new OperatorFixtureBuilder();
+    builder.configBuilder()
+      // Required to avoid Dynamic UDF calls for missing or
+      // ambiguous functions.
+      .put(ExecConstants.UDF_DISABLE_DYNAMIC, true);
+    return builder;
+  }
+
+  public static OperatorFixture standardFixture() {
+    return builder().build();
+  }
+
+  public OperExecContext newOperExecContext(PhysicalOperator opDefn) {
+    return new OperExecContextImpl(context, allocator, stats, opDefn, null);
+  }
+
+  public RowSetBuilder rowSetBuilder(BatchSchema schema) {
+    return new RowSetBuilder(allocator, schema);
+  }
+
+  public ExtendableRowSet rowSet(BatchSchema schema) {
+    return new DirectRowSet(allocator, schema);
+  }
+
+  public RowSet wrap(VectorContainer container) {
+    switch (container.getSchema().getSelectionVectorMode()) {
+    case FOUR_BYTE:
+      return new HyperRowSetImpl(allocator(), container, container.getSelectionVector4());
+    case NONE:
+      return new DirectRowSet(allocator(), container);
+    case TWO_BYTE:
+      return new IndirectRowSet(allocator(), container);
+    default:
+      throw new IllegalStateException( "Unexpected selection mode" );
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index f9837ff..f2a27c8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -31,11 +31,13 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.client.PrintingResultsListener;
 import org.apache.drill.exec.client.QuerySubmitter.Format;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.RpcException;
@@ -46,6 +48,9 @@ import org.apache.drill.exec.util.VectorUtil;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.RowSetReader;
 
 import com.google.common.base.Preconditions;
 
@@ -271,6 +276,119 @@ public class QueryBuilder {
   }
 
   /**
+   * Run the query and return the first result set as a
+   * {@link DirectRowSet} object that can be inspected directly
+   * by the code using a {@link RowSetReader}.
+   * <p>
+   * An enhancement is to provide a way to read a series of result
+   * batches as row sets.
+   * @return a row set that represents the first batch returned from
+   * the query
+   * @throws RpcException if anything goes wrong
+   */
+
+  public DirectRowSet rowSet() throws RpcException {
+
+    // Ignore all but the first non-empty batch.
+
+    QueryDataBatch dataBatch = null;
+    for (QueryDataBatch batch : results()) {
+      if (dataBatch == null  &&  batch.getHeader().getRowCount() != 0) {
+        dataBatch = batch;
+      } else {
+        batch.release();
+      }
+    }
+
+    // No results?
+
+    if (dataBatch == null) {
+      return null;
+    }
+
+    // Unload the batch and convert to a row set.
+
+    final RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
+    try {
+      loader.load(dataBatch.getHeader().getDef(), dataBatch.getData());
+      dataBatch.release();
+      VectorContainer container = loader.getContainer();
+      container.setRecordCount(loader.getRecordCount());
+      return new DirectRowSet(client.allocator(), container);
+    } catch (SchemaChangeException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /**
+   * Run the query that is expected to return (at least) one row
+   * with the only (or first) column returning a long value.
+   * The long value cannot be null.
+   *
+   * @return the value of the first column of the first row
+   * @throws RpcException if anything goes wrong
+   */
+
+  public long singletonLong() throws RpcException {
+    RowSet rowSet = rowSet();
+    if (rowSet == null) {
+      throw new IllegalStateException("No rows returned");
+    }
+    RowSetReader reader = rowSet.reader();
+    reader.next();
+    long value = reader.column(0).getLong();
+    rowSet.clear();
+    return value;
+  }
+
+  /**
+   * Run the query that is expected to return (at least) one row
+   * with the only (or first) column returning a int value.
+   * The int value cannot be null.
+   *
+   * @return the value of the first column of the first row
+   * @throws RpcException if anything goes wrong
+   */
+
+  public int singletonInt() throws RpcException {
+    RowSet rowSet = rowSet();
+    if (rowSet == null) {
+      throw new IllegalStateException("No rows returned");
+    }
+    RowSetReader reader = rowSet.reader();
+    reader.next();
+    int value = reader.column(0).getInt();
+    rowSet.clear();
+    return value;
+  }
+
+  /**
+   * Run the query that is expected to return (at least) one row
+   * with the only (or first) column returning a string value.
+   * The value may be null, in which case a null string is returned.
+   *
+   * @return the value of the first column of the first row
+   * @throws RpcException if anything goes wrong
+   */
+
+  public String singletonString() throws RpcException {
+    RowSet rowSet = rowSet();
+    if (rowSet == null) {
+      throw new IllegalStateException("No rows returned");
+    }
+    RowSetReader reader = rowSet.reader();
+    reader.next();
+    String value;
+    if (reader.column(0).isNull()) {
+      value = null;
+    } else {
+      value = reader.column(0).getString();
+    }
+    rowSet.clear();
+    return value;
+  }
+
+  /**
    * Run the query with the listener provided. Use when the result
    * count will be large, or you don't need the results.
    *

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
index 9df0b23..c7cb1b2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
@@ -228,12 +228,15 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
   /**
    * Selection vector that indexes into the hyper vectors.
    */
+
   private final SelectionVector4 sv4;
+
   /**
    * Collection of hyper vectors in flattened order: a left-to-right,
    * depth first ordering of vectors in maps. Order here corresponds to
    * the order used for column indexes in the row set reader.
    */
+
   private final HyperVectorWrapper<ValueVector> hvw[];
 
   public HyperRowSetImpl(BufferAllocator allocator, VectorContainer container, SelectionVector4 sv4) {

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
index a5b03c8..74e9356 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
@@ -70,6 +70,7 @@ public final class RowSetBuilder {
    *
    * @return this builder
    */
+
   public RowSetBuilder withSv2() {
     withSv2 = true;
     return this;

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/RowSetTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/RowSetTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/RowSetTest.java
new file mode 100644
index 0000000..8d9179b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/RowSetTest.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.vector.accessor.ArrayReader;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.test.rowSet.RowSet.RowSetReader;
+import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetSchema;
+import org.apache.drill.test.rowSet.RowSetSchema.FlattenedSchema;
+import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+
+public class RowSetTest {
+
+  private static OperatorFixture fixture;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    fixture = OperatorFixture.standardFixture();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    fixture.close();
+  }
+
+  /**
+   * Test a simple physical schema with no maps.
+   */
+
+  @Test
+  public void testSchema() {
+    BatchSchema batchSchema = new SchemaBuilder()
+        .add("c", MinorType.INT)
+        .add("a", MinorType.INT, DataMode.REPEATED)
+        .addNullable("b", MinorType.VARCHAR)
+        .build();
+
+    assertEquals("c", batchSchema.getColumn(0).getName());
+    assertEquals("a", batchSchema.getColumn(1).getName());
+    assertEquals("b", batchSchema.getColumn(2).getName());
+
+    RowSetSchema schema = new RowSetSchema(batchSchema);
+    TupleSchema access = schema.hierarchicalAccess();
+    assertEquals(3, access.count());
+
+    crossCheck(access, 0, "c", MinorType.INT);
+    assertEquals(DataMode.REQUIRED, access.column(0).getDataMode());
+    assertEquals(DataMode.REQUIRED, access.column(0).getType().getMode());
+    assertTrue(! access.column(0).isNullable());
+
+    crossCheck(access, 1, "a", MinorType.INT);
+    assertEquals(DataMode.REPEATED, access.column(1).getDataMode());
+    assertEquals(DataMode.REPEATED, access.column(1).getType().getMode());
+    assertTrue(! access.column(1).isNullable());
+
+    crossCheck(access, 2, "b", MinorType.VARCHAR);
+    assertEquals(MinorType.VARCHAR, access.column(2).getType().getMinorType());
+    assertEquals(DataMode.OPTIONAL, access.column(2).getDataMode());
+    assertEquals(DataMode.OPTIONAL, access.column(2).getType().getMode());
+    assertTrue(access.column(2).isNullable());
+
+    // No maps: physical schema is the same as access schema.
+
+    PhysicalSchema physical = schema.physical();
+    assertEquals(3, physical.count());
+    assertEquals("c", physical.column(0).field().getName());
+    assertEquals("a", physical.column(1).field().getName());
+    assertEquals("b", physical.column(2).field().getName());
+  }
+
+  public void crossCheck(TupleSchema schema, int index, String fullName, MinorType type) {
+    String name = null;
+    for (String part : Splitter.on(".").split(fullName)) {
+      name = part;
+    }
+    assertEquals(name, schema.column(index).getName());
+    assertEquals(index, schema.columnIndex(fullName));
+    assertSame(schema.column(index), schema.column(fullName));
+    assertEquals(type, schema.column(index).getType().getMinorType());
+  }
+
+  @Test
+  public void testMapSchema() {
+    BatchSchema batchSchema = new SchemaBuilder()
+        .add("c", MinorType.INT)
+        .addMap("a")
+          .addNullable("b", MinorType.VARCHAR)
+          .add("d", MinorType.INT)
+          .addMap("e")
+            .add("f", MinorType.VARCHAR)
+            .buildMap()
+          .add("g", MinorType.INT)
+          .buildMap()
+        .add("h", MinorType.BIGINT)
+        .build();
+
+    RowSetSchema schema = new RowSetSchema(batchSchema);
+
+    // Access schema: flattened with maps removed
+
+    FlattenedSchema access = schema.flatAccess();
+    assertEquals(6, access.count());
+    crossCheck(access, 0, "c", MinorType.INT);
+    crossCheck(access, 1, "a.b", MinorType.VARCHAR);
+    crossCheck(access, 2, "a.d", MinorType.INT);
+    crossCheck(access, 3, "a.e.f", MinorType.VARCHAR);
+    crossCheck(access, 4, "a.g", MinorType.INT);
+    crossCheck(access, 5, "h", MinorType.BIGINT);
+
+    // Should have two maps.
+
+    assertEquals(2, access.mapCount());
+    assertEquals("a", access.map(0).getName());
+    assertEquals("e", access.map(1).getName());
+    assertEquals(0, access.mapIndex("a"));
+    assertEquals(1, access.mapIndex("a.e"));
+
+    // Verify physical schema: should mirror the schema created above.
+
+    PhysicalSchema physical = schema.physical();
+    assertEquals(3, physical.count());
+    assertEquals("c", physical.column(0).field().getName());
+    assertEquals("c", physical.column(0).fullName());
+    assertFalse(physical.column(0).isMap());
+    assertNull(physical.column(0).mapSchema());
+
+    assertEquals("a", physical.column(1).field().getName());
+    assertEquals("a", physical.column(1).fullName());
+    assertTrue(physical.column(1).isMap());
+    assertNotNull(physical.column(1).mapSchema());
+
+    assertEquals("h", physical.column(2).field().getName());
+    assertEquals("h", physical.column(2).fullName());
+    assertFalse(physical.column(2).isMap());
+    assertNull(physical.column(2).mapSchema());
+
+    PhysicalSchema aSchema = physical.column(1).mapSchema();
+    assertEquals(4, aSchema.count());
+    assertEquals("b", aSchema.column(0).field().getName());
+    assertEquals("a.b", aSchema.column(0).fullName());
+    assertEquals("d", aSchema.column(1).field().getName());
+    assertEquals("e", aSchema.column(2).field().getName());
+    assertEquals("g", aSchema.column(3).field().getName());
+
+    PhysicalSchema eSchema = aSchema.column(2).mapSchema();
+    assertEquals(1, eSchema.count());
+    assertEquals("f", eSchema.column(0).field().getName());
+    assertEquals("a.e.f", eSchema.column(0).fullName());
+  }
+
+  @Test
+  public void testScalarReaderWriter() {
+    testTinyIntRW();
+    testSmallIntRW();
+    testIntRW();
+    testLongRW();
+    testFloatRW();
+    testDoubleRW();
+  }
+
+  private void testTinyIntRW() {
+    BatchSchema batchSchema = new SchemaBuilder()
+        .add("col", MinorType.TINYINT)
+        .build();
+    SingleRowSet rs = fixture.rowSetBuilder(batchSchema)
+        .add(0)
+        .add(Byte.MAX_VALUE)
+        .add(Byte.MIN_VALUE)
+        .build();
+    RowSetReader reader = rs.reader();
+    assertTrue(reader.next());
+    assertEquals(0, reader.column(0).getInt());
+    assertTrue(reader.next());
+    assertEquals(Byte.MAX_VALUE, reader.column(0).getInt());
+    assertTrue(reader.next());
+    assertEquals(Byte.MIN_VALUE, reader.column(0).getInt());
+    assertFalse(reader.next());
+    rs.clear();
+  }
+
+  private void testSmallIntRW() {
+    BatchSchema batchSchema = new SchemaBuilder()
+        .add("col", MinorType.SMALLINT)
+        .build();
+    SingleRowSet rs = fixture.rowSetBuilder(batchSchema)
+        .add(0)
+        .add(Short.MAX_VALUE)
+        .add(Short.MIN_VALUE)
+        .build();
+    RowSetReader reader = rs.reader();
+    assertTrue(reader.next());
+    assertEquals(0, reader.column(0).getInt());
+    assertTrue(reader.next());
+    assertEquals(Short.MAX_VALUE, reader.column(0).getInt());
+    assertTrue(reader.next());
+    assertEquals(Short.MIN_VALUE, reader.column(0).getInt());
+    assertFalse(reader.next());
+    rs.clear();
+  }
+
+  private void testIntRW() {
+    BatchSchema batchSchema = new SchemaBuilder()
+        .add("col", MinorType.INT)
+        .build();
+    SingleRowSet rs = fixture.rowSetBuilder(batchSchema)
+        .add(0)
+        .add(Integer.MAX_VALUE)
+        .add(Integer.MIN_VALUE)
+        .build();
+    RowSetReader reader = rs.reader();
+    assertTrue(reader.next());
+    assertEquals(0, reader.column(0).getInt());
+    assertTrue(reader.next());
+    assertEquals(Integer.MAX_VALUE, reader.column(0).getInt());
+    assertTrue(reader.next());
+    assertEquals(Integer.MIN_VALUE, reader.column(0).getInt());
+    assertFalse(reader.next());
+    rs.clear();
+  }
+
+  private void testLongRW() {
+    BatchSchema batchSchema = new SchemaBuilder()
+        .add("col", MinorType.BIGINT)
+        .build();
+    SingleRowSet rs = fixture.rowSetBuilder(batchSchema)
+        .add(0L)
+        .add(Long.MAX_VALUE)
+        .add(Long.MIN_VALUE)
+        .build();
+    RowSetReader reader = rs.reader();
+    assertTrue(reader.next());
+    assertEquals(0, reader.column(0).getLong());
+    assertTrue(reader.next());
+    assertEquals(Long.MAX_VALUE, reader.column(0).getLong());
+    assertTrue(reader.next());
+    assertEquals(Long.MIN_VALUE, reader.column(0).getLong());
+    assertFalse(reader.next());
+    rs.clear();
+  }
+
+  private void testFloatRW() {
+    BatchSchema batchSchema = new SchemaBuilder()
+        .add("col", MinorType.FLOAT4)
+        .build();
+    SingleRowSet rs = fixture.rowSetBuilder(batchSchema)
+        .add(0F)
+        .add(Float.MAX_VALUE)
+        .add(Float.MIN_VALUE)
+        .build();
+    RowSetReader reader = rs.reader();
+    assertTrue(reader.next());
+    assertEquals(0, reader.column(0).getDouble(), 0.000001);
+    assertTrue(reader.next());
+    assertEquals(Float.MAX_VALUE, reader.column(0).getDouble(), 0.000001);
+    assertTrue(reader.next());
+    assertEquals(Float.MIN_VALUE, reader.column(0).getDouble(), 0.000001);
+    assertFalse(reader.next());
+    rs.clear();
+  }
+
+  private void testDoubleRW() {
+    BatchSchema batchSchema = new SchemaBuilder()
+        .add("col", MinorType.FLOAT8)
+        .build();
+    SingleRowSet rs = fixture.rowSetBuilder(batchSchema)
+        .add(0D)
+        .add(Double.MAX_VALUE)
+        .add(Double.MIN_VALUE)
+        .build();
+    RowSetReader reader = rs.reader();
+    assertTrue(reader.next());
+    assertEquals(0, reader.column(0).getDouble(), 0.000001);
+    assertTrue(reader.next());
+    assertEquals(Double.MAX_VALUE, reader.column(0).getDouble(), 0.000001);
+    assertTrue(reader.next());
+    assertEquals(Double.MIN_VALUE, reader.column(0).getDouble(), 0.000001);
+    assertFalse(reader.next());
+    rs.clear();
+  }
+
+  @Test
+  public void testMap() {
+    BatchSchema batchSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("b")
+          .add("c", MinorType.INT)
+          .add("d", MinorType.INT)
+          .buildMap()
+        .build();
+    SingleRowSet rs = fixture.rowSetBuilder(batchSchema)
+        .add(10, 20, 30)
+        .add(40, 50, 60)
+        .build();
+    RowSetReader reader = rs.reader();
+    assertTrue(reader.next());
+    assertEquals(10, reader.column(0).getInt());
+    assertEquals(20, reader.column(1).getInt());
+    assertEquals(30, reader.column(2).getInt());
+    assertEquals(10, reader.column("a").getInt());
+    assertEquals(30, reader.column("b.d").getInt());
+    assertTrue(reader.next());
+    assertEquals(40, reader.column(0).getInt());
+    assertEquals(50, reader.column(1).getInt());
+    assertEquals(60, reader.column(2).getInt());
+    assertFalse(reader.next());
+    rs.clear();
+  }
+
+  @Test
+  public void TestTopScalarArray() {
+    BatchSchema batchSchema = new SchemaBuilder()
+        .add("c", MinorType.INT)
+        .addArray("a", MinorType.INT)
+        .build();
+
+    ExtendableRowSet rs1 = fixture.rowSet(batchSchema);
+    RowSetWriter writer = rs1.writer();
+    writer.column(0).setInt(10);
+    ArrayWriter array = writer.column(1).array();
+    array.setInt(100);
+    array.setInt(110);
+    writer.save();
+    writer.column(0).setInt(20);
+    array = writer.column(1).array();
+    array.setInt(200);
+    array.setInt(120);
+    array.setInt(220);
+    writer.save();
+    writer.column(0).setInt(30);
+    writer.save();
+    writer.done();
+
+    RowSetReader reader = rs1.reader();
+    assertTrue(reader.next());
+    assertEquals(10, reader.column(0).getInt());
+    ArrayReader arrayReader = reader.column(1).array();
+    assertEquals(2, arrayReader.size());
+    assertEquals(100, arrayReader.getInt(0));
+    assertEquals(110, arrayReader.getInt(1));
+    assertTrue(reader.next());
+    assertEquals(20, reader.column(0).getInt());
+    arrayReader = reader.column(1).array();
+    assertEquals(3, arrayReader.size());
+    assertEquals(200, arrayReader.getInt(0));
+    assertEquals(120, arrayReader.getInt(1));
+    assertEquals(220, arrayReader.getInt(2));
+    assertTrue(reader.next());
+    assertEquals(30, reader.column(0).getInt());
+    arrayReader = reader.column(1).array();
+    assertEquals(0, arrayReader.size());
+    assertFalse(reader.next());
+
+    SingleRowSet rs2 = fixture.rowSetBuilder(batchSchema)
+      .add(10, new int[] {100, 110})
+      .add(20, new int[] {200, 120, 220})
+      .add(30, null)
+      .build();
+
+    new RowSetComparison(rs1)
+      .verifyAndClear(rs2);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3e8b01d5/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/package-info.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/package-info.java
new file mode 100644
index 0000000..4d44275
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Tests for the row set test fixture. Yes, very meta.
+ */
+package org.apache.drill.test.rowSet.test;


[2/3] drill git commit: DRILL-5323: Test tools for row sets

Posted by su...@apache.org.
http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
new file mode 100644
index 0000000..48657c7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+
+/**
+ * Builder of a row set schema expressed as a list of materialized
+ * fields. Optimized for use when creating schemas by hand in tests.
+ * <p>
+ * Example usage to create the following schema: <br>
+ * <tt>(c: INT, a: MAP(b: VARCHAR, d: INT, e: MAP(f: VARCHAR), g: INT), h: BIGINT)</tt>
+ * <p>
+ * Code:<pre><code>
+ *     BatchSchema batchSchema = new SchemaBuilder()
+ *        .add("c", MinorType.INT)
+ *        .addMap("a")
+ *          .addNullable("b", MinorType.VARCHAR)
+ *          .add("d", MinorType.INT)
+ *          .addMap("e")
+ *            .add("f", MinorType.VARCHAR)
+ *            .buildMap()
+ *          .add("g", MinorType.INT)
+ *          .buildMap()
+ *        .addArray("h", MinorType.BIGINT)
+ *        .build();
+ * </code</pre>
+ */
+
+public class SchemaBuilder {
+
+  /**
+   * Internal structure for building a map. A map is just a schema,
+   * but one that is part of a parent column.
+   */
+
+  public static class MapBuilder extends SchemaBuilder {
+    private final SchemaBuilder parent;
+    private final String memberName;
+
+    public MapBuilder(SchemaBuilder parent, String memberName) {
+      this.parent = parent;
+      this.memberName = memberName;
+    }
+
+    @Override
+    public BatchSchema build() {
+      throw new IllegalStateException("Cannot build for a nested schema");
+    }
+
+    @Override
+    public SchemaBuilder buildMap() {
+      MaterializedField col = MaterializedField.create(memberName,
+          MajorType.newBuilder()
+            .setMinorType(MinorType.MAP)
+            .setMode(DataMode.REQUIRED)
+            .build());
+      for (MaterializedField childCol : columns) {
+        col.addChild(childCol);
+      }
+      parent.finishMap(col);
+      return parent;
+    }
+  }
+
+  protected List<MaterializedField> columns = new ArrayList<>( );
+
+  public SchemaBuilder() { }
+
+  public SchemaBuilder add(String pathName, MajorType type) {
+    MaterializedField col = MaterializedField.create(pathName, type);
+    columns.add(col);
+    return this;
+  }
+
+  public SchemaBuilder add(String pathName, MinorType type, DataMode mode) {
+    return add(pathName, MajorType.newBuilder()
+        .setMinorType(type)
+        .setMode(mode)
+        .build());
+  }
+
+  public SchemaBuilder add(String pathName, MinorType type) {
+    return add(pathName, type, DataMode.REQUIRED);
+  }
+
+  public SchemaBuilder addNullable(String pathName, MinorType type) {
+    return add(pathName, type, DataMode.OPTIONAL);
+  }
+
+  public SchemaBuilder addArray(String pathName, MinorType type) {
+    return add(pathName, type, DataMode.REPEATED);
+  }
+
+  /**
+   * Add a map column. The returned schema builder is for the nested
+   * map. Building that map, using {@link MapBuilder#buildMap()},
+   * will return the original schema builder.
+   *
+   * @param pathName the name of the map column
+   * @return a builder for the map
+   */
+
+  public MapBuilder addMap(String pathName) {
+    return new MapBuilder(this, pathName);
+  }
+
+  public BatchSchema build() {
+    return new BatchSchema(SelectionVectorMode.NONE, columns);
+  }
+
+  void finishMap(MaterializedField map) {
+    columns.add(map);
+  }
+
+  public SchemaBuilder buildMap() {
+    throw new IllegalStateException("Cannot build map for a top-level schema");
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/package-info.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/package-info.java
new file mode 100644
index 0000000..0236a20
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/package-info.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides a set of tools to work with row sets when creating operator
+ * and "sub-operator" unit tests. A row set is a batch of Drill vectors,
+ * often called a "record batch." However, a record batch, in Drill, means
+ * not just the data, but also an operator on that data. The classes
+ * here work with the data itself, and can be used to test implementations
+ * of things such as code generated classes and so on.
+ * <p>
+ * The classes include tools for reading and writing row sets, comparing
+ * actual and expected results, and so on.
+ * <p>
+ * Drill defines a variety of record batch semantics, modeled here as
+ * distinct row set classes:
+ * <dl>
+ * <dt>RowSet</dt>
+ * <dd>The abstract definition of a row set that defines operations available
+ * on all row sets.</dd>
+ * <dt>SingleRowSet (abstract)</dt>
+ * <dd>Represents a row set that contains a single record batch (the typical
+ * case.</dd>
+ * <dt>DirectRowSet</dt>
+ * <dd>A read-only single row set without a selection vector.</dd>
+ * <dt>IndirectRowSet</dt>
+ * <dd>A read-only, single row set with an SV2. Note that the SV2 itself is
+ * writable (such as for sorting.)</dd>
+ * <dt>ExtendibleRowSet</dt>
+ * <dd>A write-only, single row set used to create a new row set. Because of
+ * the way Drill sets row counts, an extendible row set cannot be read; instead
+ * at the completion of the write the extendible row set becomes a direct or
+ * indirect row set.</dd>
+ * <dt>HyperRowSet</dt>
+ * <dd>A read-only row set made up of a collection of record batches, indexed via an
+ * SV4. As with the SV2, the SV4 itself is writable.</dt>
+ * </dl>
+ * This package contains a number of helper classes:
+ * <dl>
+ * <dt>RowSetWriter</dt>
+ * <dd>Writes data into an extendible row set.</dd>
+ * <dt>RowSetReader</dt>
+ * <dd>Reads data from any but an extendible row set.</dd>
+ * <dt>RowSetBuilder</dt>
+ * <dd>Creates and populates a row set in a fluent builder style.</dd>
+ * <dt>RowSetPrinter</dt>
+ * <dd>Prints a row set to stdout in a CSV-like form for easy debugging.</dd>
+ * <dt>RowSetComparision</dt>
+ * <dd>Used in tests to compare an "actual" row set against an "expected"
+ * row set. Does a complete check of row counts, types and values. If values
+ * are arrays (repeated), does a check of the entire array. Uses JUnit assertions
+ * to report comparison failures.</dd>
+ * <dt>SchemaBuilder</dt>
+ * <dd>Drill normally writes data to vectors, then "discovers" the row set schema based on the
+ * data written. For tests, it is usually far easier to simply declare a schema, then
+ * read and write data according to that schema. The schema builder provides a simple,
+ * fluent tool to create a row set schema. That schema then drives the row set readers
+ * and writers, the row set printer and the row set comparison.</dd>
+ * </dl>
+ */
+
+package org.apache.drill.test.rowSet;


[3/3] drill git commit: DRILL-5323: Test tools for row sets

Posted by su...@apache.org.
DRILL-5323: Test tools for row sets

Provide test tools to create, populate and compare row sets

To simplify tests, we need a TestRowSet concept that wraps a
VectorContainer and provides easy ways to:

- Define a schema for the row set.
- Create a set of vectors that implement the schema.
- Populate the row set with test data via code.
- Add an SV2 to the row set.
- Pass the row set to operator components (such as generated code
blocks.)
- Examine the contents of a row set
- Compare the results of the operation with an expected result set.
- Dispose of the underling direct memory when work is done.

This code builds on that in DRILL-5324 to provide a complete row set
API. See DRILL-5318 for the spec.

Note: this code can be reviewed as-is, but cannot be committed until
after DRILL-5324 is committed: this code has compile-time dependencies
on that code. This PR will be rebased once DRILL-5324 is pulled into
master.

Handles maps and intervals

The row set schema is refined to provide two forms of schema. A
physical schema shows the nested structure of the data with maps
expanding into their contents.

Updates the row set schema builder to easily build a schema with maps.

An access schema shows the row \u201cflattened\u201d to include just scalar
(non-map) columns, with all columns at a single level, with dotted
names identifying nested fields. This form makes for very simple access.

Then, provides tools for reading and writing batches with maps by
presenting the flattened view to the row reader and writer.

HyperVectors have a very complex structure for maps. The hyper row set
implementation takes a first crack at mapping that structure into the
standardized row set format.

Also provides a handy way to set an INTERVAL column from an int. There
is no good mapping from an int to an interval, so an arbitrary
convention is used. This convention is not generally useful, but is
very handy for quickly generating test data.

As before, this is a partial PR. The code here still depends on
DRILL-5324 to provide the column accessors needed by the row reader and
writer.

All this code is getting rather complex, so this commit includes a unit
test of the schema and row set code.

Revisions to support arrays

Arrays require a somewhat different API. Refactored to allow arrays to
appear as a field type.

While refactoring, moved interfaces to more logical locations.

Added more comments.

Rejiggered the row set schema to provide both a physical and flattened
(access) schema, both driven from the original batch schema.

Pushed some accessor and writer classes into the accessor layer.

Added tests for arrays.

Also added more comments where needed.

Moved tests to DRILL-5318

The test classes previously here depend on the new \u201coperator fixture\u201d.
To provide a non-cyclic checkin order, moved the tests to the PR with
the fixtures so that this PR is clear of dependencies. The tests were
reviewed in the context of DRILL-5318.

Also pulls in batch sizer support for map fields which are required by
the tests.

closes #785


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/095a660b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/095a660b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/095a660b

Branch: refs/heads/master
Commit: 095a660b337e3fbdd09c13e220a545617aaf922c
Parents: 381eab6
Author: Paul Rogers <pr...@maprtech.com>
Authored: Tue Mar 14 16:18:24 2017 -0700
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Fri Apr 21 14:51:36 2017 -0700

----------------------------------------------------------------------
 .../physical/impl/spill/RecordBatchSizer.java   |  53 +++-
 .../apache/drill/exec/record/BatchSchema.java   |   5 +-
 .../exec/record/VectorAccessibleUtilities.java  |  51 ++++
 .../drill/test/rowSet/AbstractRowSet.java       | 164 ++++++++++
 .../drill/test/rowSet/AbstractSingleRowSet.java | 217 +++++++++++++
 .../apache/drill/test/rowSet/DirectRowSet.java  | 236 ++++++++++++++
 .../drill/test/rowSet/HyperRowSetImpl.java      | 292 ++++++++++++++++++
 .../drill/test/rowSet/IndirectRowSet.java       | 125 ++++++++
 .../org/apache/drill/test/rowSet/RowSet.java    | 198 ++++++++++++
 .../apache/drill/test/rowSet/RowSetBuilder.java |  85 ++++++
 .../drill/test/rowSet/RowSetComparison.java     | 244 +++++++++++++++
 .../apache/drill/test/rowSet/RowSetPrinter.java | 101 ++++++
 .../apache/drill/test/rowSet/RowSetSchema.java  | 304 +++++++++++++++++++
 .../drill/test/rowSet/RowSetUtilities.java      | 106 +++++++
 .../apache/drill/test/rowSet/SchemaBuilder.java | 142 +++++++++
 .../apache/drill/test/rowSet/package-info.java  |  76 +++++
 16 files changed, 2385 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
index b384e0a..4cb2bae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -20,14 +20,17 @@ package org.apache.drill.exec.physical.impl.spill;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
 
 /**
  * Given a record batch or vector container, determines the actual memory
@@ -66,16 +69,15 @@ public class RecordBatchSizer {
     public int density;
     public int dataSize;
 
-    @SuppressWarnings("resource")
-    public ColumnSize(VectorWrapper<?> vw) {
-      metadata = vw.getField();
+    public ColumnSize(ValueVector v) {
+      metadata = v.getField();
       stdSize = TypeHelper.getSize(metadata.getType());
 
       // Can't get size estimates if this is an empty batch.
 
-      ValueVector v = vw.getValueVector();
       int rowCount = v.getAccessor().getValueCount();
       if (rowCount == 0) {
+        estSize = stdSize;
         return;
       }
 
@@ -128,7 +130,7 @@ public class RecordBatchSizer {
     }
   }
 
-  List<ColumnSize> columnSizes = new ArrayList<>();
+  private List<ColumnSize> columnSizes = new ArrayList<>();
 
   /**
    * Number of records (rows) in the batch.
@@ -159,7 +161,17 @@ public class RecordBatchSizer {
 
   private int netBatchSize;
 
+  public RecordBatchSizer(RecordBatch batch) {
+    this(batch,
+         (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) ?
+         batch.getSelectionVector2() : null);
+  }
+
   public RecordBatchSizer(VectorAccessible va) {
+    this(va, null);
+  }
+
+  public RecordBatchSizer(VectorAccessible va, SelectionVector2 sv2) {
     rowCount = va.getRecordCount();
     for (VectorWrapper<?> vw : va) {
       measureColumn(vw);
@@ -169,12 +181,9 @@ public class RecordBatchSizer {
       grossRowWidth = roundUp(totalBatchSize, rowCount);
     }
 
-    hasSv2 = va.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE;
-    if (hasSv2) {
-      @SuppressWarnings("resource")
-      SelectionVector2 sv2 = va.getSelectionVector2();
+    if (sv2 != null) {
       sv2Size = sv2.getBuffer(false).capacity();
-      grossRowWidth += sv2Size / rowCount;
+      grossRowWidth += roundUp(sv2Size, rowCount);
       netRowWidth += 2;
     }
 
@@ -200,7 +209,19 @@ public class RecordBatchSizer {
   }
 
   private void measureColumn(VectorWrapper<?> vw) {
-    ColumnSize colSize = new ColumnSize(vw);
+    measureColumn(vw.getValueVector());
+  }
+
+  private void measureColumn(ValueVector v) {
+
+    // Maps consume no size themselves. However, their contained
+    // vectors do consume space, so visit columns recursively.
+
+    if (v.getField().getType().getMinorType() == MinorType.MAP) {
+      expandMap((AbstractMapVector) v);
+      return;
+    }
+    ColumnSize colSize = new ColumnSize(v);
     columnSizes.add(colSize);
 
     stdRowWidth += colSize.stdSize;
@@ -209,6 +230,12 @@ public class RecordBatchSizer {
     netRowWidth += colSize.estSize;
   }
 
+  private void expandMap(AbstractMapVector mapVector) {
+    for (ValueVector vector : mapVector) {
+      measureColumn(vector);
+    }
+  }
+
   public static int roundUp(int num, int denom) {
     if(denom == 0) {
       return 0;
@@ -240,8 +267,10 @@ public class RecordBatchSizer {
     buf.append(rowCount);
     buf.append(", Total size: ");
     buf.append(totalBatchSize);
-    buf.append(", Row width:");
+    buf.append(", Gross row width:");
     buf.append(grossRowWidth);
+    buf.append(", Net row width:");
+    buf.append(netRowWidth);
     buf.append(", Density:");
     buf.append(avgDensity);
     buf.append("}");

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 3591148..168995d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -28,10 +28,11 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 
 public class BatchSchema implements Iterable<MaterializedField> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class);
-  final SelectionVectorMode selectionVectorMode;
+
+  private final SelectionVectorMode selectionVectorMode;
   private final List<MaterializedField> fields;
 
-  BatchSchema(SelectionVectorMode selectionVector, List<MaterializedField> fields) {
+  public BatchSchema(SelectionVectorMode selectionVector, List<MaterializedField> fields) {
     this.fields = fields;
     this.selectionVectorMode = selectionVector;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
new file mode 100644
index 0000000..12b9053
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.AllocationHelper;
+
+/**
+ * VectorAccessible is an interface. Yet, several operations are done
+ * on VectorAccessible over and over gain. While Java 8 allows static
+ * methods on an interface, Drill uses Java 7, which does not. This
+ * class is a placeholder for common VectorAccessible methods that
+ * can migrate into the interface when Drill upgrades to Java 8.
+ */
+
+public class VectorAccessibleUtilities {
+
+  private VectorAccessibleUtilities() { }
+
+  public static void clear(VectorAccessible va) {
+    for (final VectorWrapper<?> w : va) {
+      w.clear();
+    }
+  }
+
+  public static void setValueCount(VectorAccessible va, int count) {
+    for (VectorWrapper<?> w: va) {
+      w.getValueVector().getMutator().setValueCount(count);
+    }
+  }
+
+  public static void allocateVectors(VectorAccessible va, int targetRecordCount) {
+    for (VectorWrapper<?> w: va) {
+      AllocationHelper.allocateNew(w.getValueVector(), targetRecordCount);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java
new file mode 100644
index 0000000..a32262a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
+import org.apache.drill.exec.vector.accessor.impl.AbstractColumnAccessor.RowIndex;
+import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader;
+import org.apache.drill.exec.vector.accessor.impl.TupleReaderImpl;
+
+/**
+ * Basic implementation of a row set for both the single and multiple
+ * (hyper) varieties, both the fixed and extendible varieties.
+ */
+
+public abstract class AbstractRowSet implements RowSet {
+
+  /**
+   * Row set index base class used when indexing rows within a row
+   * set for a row set reader. Keeps track of the current position,
+   * which starts before the first row, meaning that the client
+   * must call <tt>next()</tt> to advance to the first row.
+   */
+
+  public static abstract class RowSetIndex implements RowIndex {
+    protected int rowIndex = -1;
+
+    public int position() { return rowIndex; }
+    public abstract boolean next();
+    public abstract int size();
+    public abstract boolean valid();
+    public void set(int index) { rowIndex = index; }
+  }
+
+  /**
+   * Bounded (read-only) version of the row set index. When reading,
+   * the row count is fixed, and set here.
+   */
+
+  public static abstract class BoundedRowIndex extends RowSetIndex {
+
+    protected final int rowCount;
+
+    public BoundedRowIndex(int rowCount) {
+      this.rowCount = rowCount;
+    }
+
+    @Override
+    public boolean next() {
+      if (++rowIndex < rowCount ) {
+        return true;
+      } else {
+        rowIndex--;
+        return false;
+      }
+    }
+
+    @Override
+    public int size() { return rowCount; }
+
+    @Override
+    public boolean valid() { return rowIndex < rowCount; }
+  }
+
+  /**
+   * Reader implementation for a row set.
+   */
+
+  public class RowSetReaderImpl extends TupleReaderImpl implements RowSetReader {
+
+    protected final RowSetIndex index;
+
+    public RowSetReaderImpl(TupleSchema schema, RowSetIndex index, AbstractColumnReader[] readers) {
+      super(schema, readers);
+      this.index = index;
+    }
+
+    @Override
+    public boolean next() { return index.next(); }
+
+    @Override
+    public boolean valid() { return index.valid(); }
+
+    @Override
+    public int index() { return index.position(); }
+
+    @Override
+    public int size() { return index.size(); }
+
+    @Override
+    public int rowIndex() { return index.index(); }
+
+    @Override
+    public int batchIndex() { return index.batch(); }
+
+    @Override
+    public void set(int index) { this.index.set(index); }
+  }
+
+  protected final BufferAllocator allocator;
+  protected final RowSetSchema schema;
+  protected final VectorContainer container;
+  protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+
+  public AbstractRowSet(BufferAllocator allocator, BatchSchema schema, VectorContainer container) {
+    this.allocator = allocator;
+    this.schema = new RowSetSchema(schema);
+    this.container = container;
+  }
+
+  @Override
+  public VectorAccessible vectorAccessible() { return container; }
+
+  @Override
+  public VectorContainer container() { return container; }
+
+  @Override
+  public int rowCount() { return container.getRecordCount(); }
+
+  @Override
+  public void clear() {
+    container.zeroVectors();
+    container.setRecordCount(0);
+  }
+
+  @Override
+  public RowSetSchema schema() { return schema; }
+
+  @Override
+  public BufferAllocator allocator() { return allocator; }
+
+  @Override
+  public void print() {
+    new RowSetPrinter(this).print();
+  }
+
+  @Override
+  public int size() {
+    throw new UnsupportedOperationException("getSize");
+  }
+
+  @Override
+  public BatchSchema batchSchema() {
+    return container.getSchema();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
new file mode 100644
index 0000000..d8176de
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader;
+import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetSchema.FlattenedSchema;
+import org.apache.drill.test.rowSet.RowSetSchema.LogicalColumn;
+import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
+
+/**
+ * Base class for row sets backed by a single record batch.
+ */
+
+public abstract class AbstractSingleRowSet extends AbstractRowSet implements SingleRowSet {
+
+  /**
+   * Internal helper class to organize a set of value vectors for use by the
+   * row set class. Subclasses either build vectors from a schema, or map an
+   * existing vector container into the row set structure. The row set
+   * structure is based on a flattened structure; all vectors appear in
+   * a single vector array. Maps are set aside in a separate map list.
+   */
+
+  public abstract static class StructureBuilder {
+    protected final PhysicalSchema schema;
+    protected final BufferAllocator allocator;
+    protected final ValueVector[] valueVectors;
+    protected final MapVector[] mapVectors;
+    protected int vectorIndex;
+    protected int mapIndex;
+
+    public StructureBuilder(BufferAllocator allocator, RowSetSchema schema) {
+      this.allocator = allocator;
+      this.schema = schema.physical();
+      FlattenedSchema flatSchema = schema.flatAccess();
+      valueVectors = new ValueVector[flatSchema.count()];
+      if (flatSchema.mapCount() == 0) {
+        mapVectors = null;
+      } else {
+        mapVectors = new MapVector[flatSchema.mapCount()];
+      }
+    }
+  }
+
+  /**
+   * Create a set of value vectors given a schema, then map them into both
+   * the value container and the row set structure.
+   */
+
+  public static class VectorBuilder extends StructureBuilder {
+
+    public VectorBuilder(BufferAllocator allocator, RowSetSchema schema) {
+      super(allocator, schema);
+    }
+
+    public ValueVector[] buildContainer(VectorContainer container) {
+      for (int i = 0; i < schema.count(); i++) {
+        LogicalColumn colSchema = schema.column(i);
+        @SuppressWarnings("resource")
+        ValueVector v = TypeHelper.getNewVector(colSchema.field, allocator, null);
+        container.add(v);
+        if (colSchema.field.getType().getMinorType() == MinorType.MAP) {
+          MapVector mv = (MapVector) v;
+          mapVectors[mapIndex++] = mv;
+          buildMap(mv, colSchema.mapSchema);
+        } else {
+          valueVectors[vectorIndex++] = v;
+        }
+      }
+      container.buildSchema(SelectionVectorMode.NONE);
+      return valueVectors;
+    }
+
+    private void buildMap(MapVector mapVector, PhysicalSchema mapSchema) {
+      for (int i = 0; i < mapSchema.count(); i++) {
+        LogicalColumn colSchema = mapSchema.column(i);
+        MajorType type = colSchema.field.getType();
+        Class<? extends ValueVector> vectorClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
+        @SuppressWarnings("resource")
+        ValueVector v = mapVector.addOrGet(colSchema.field.getName(), type, vectorClass);
+        if (type.getMinorType() == MinorType.MAP) {
+          MapVector mv = (MapVector) v;
+          mapVectors[mapIndex++] = mv;
+          buildMap(mv, colSchema.mapSchema);
+        } else {
+          valueVectors[vectorIndex++] = v;
+        }
+      }
+    }
+  }
+
+  /**
+   * Build a row set given an existing vector container. In this case,
+   * the vectors exist and we simply need to pull them out of the container
+   * and maps and put them into the row set arrays.
+   */
+
+  public static class VectorMapper extends StructureBuilder {
+
+    public VectorMapper(BufferAllocator allocator, RowSetSchema schema) {
+      super(allocator, schema);
+    }
+
+    public ValueVector[] mapContainer(VectorContainer container) {
+      for (VectorWrapper<?> w : container) {
+        @SuppressWarnings("resource")
+        ValueVector v = w.getValueVector();
+        if (v.getField().getType().getMinorType() == MinorType.MAP) {
+          MapVector mv = (MapVector) v;
+          mapVectors[mapIndex++] = mv;
+          buildMap(mv);
+        } else {
+          valueVectors[vectorIndex++] = v;
+        }
+      }
+      return valueVectors;
+    }
+
+    private void buildMap(MapVector mapVector) {
+      for (ValueVector v : mapVector) {
+        if (v.getField().getType().getMinorType() == MinorType.MAP) {
+          MapVector mv = (MapVector) v;
+          mapVectors[mapIndex++] = mv;
+          buildMap(mv);
+        } else {
+          valueVectors[vectorIndex++] = v;
+        }
+      }
+    }
+  }
+
+  /**
+   * Flattened representation of value vectors using a depth-first
+   * traversal of maps. Order of vectors here correspond to the column
+   * indexes used to access columns in a reader or writer.
+   */
+
+  protected final ValueVector[] valueVectors;
+
+  public AbstractSingleRowSet(BufferAllocator allocator, BatchSchema schema) {
+    super(allocator, schema, new VectorContainer());
+    valueVectors = new VectorBuilder(allocator, super.schema).buildContainer(container);
+  }
+
+  public AbstractSingleRowSet(BufferAllocator allocator, VectorContainer container) {
+    super(allocator, container.getSchema(), container);
+    valueVectors = new VectorMapper(allocator, super.schema).mapContainer(container);
+  }
+
+  public AbstractSingleRowSet(AbstractSingleRowSet rowSet) {
+    super(rowSet.allocator, rowSet.schema.batch(), rowSet.container);
+    valueVectors = rowSet.valueVectors;
+  }
+
+  @Override
+  public ValueVector[] vectors() { return valueVectors; }
+
+  @Override
+  public int size() {
+    RecordBatchSizer sizer = new RecordBatchSizer(container);
+    return sizer.actualSize();
+  }
+
+  /**
+   * Internal method to build the set of column readers needed for
+   * this row set. Used when building a row set reader.
+   * @param rowIndex object that points to the current row
+   * @return an array of column readers: in the same order as the
+   * (non-map) vectors.
+   */
+
+  protected RowSetReader buildReader(RowSetIndex rowIndex) {
+    FlattenedSchema accessSchema = schema().flatAccess();
+    ValueVector[] valueVectors = vectors();
+    AbstractColumnReader[] readers = new AbstractColumnReader[valueVectors.length];
+    for (int i = 0; i < readers.length; i++) {
+      MinorType type = accessSchema.column(i).getType().getMinorType();
+      if (type == MinorType.MAP) {
+        readers[i] = null; // buildMapAccessor(i);
+      } else if (type == MinorType.LIST) {
+        readers[i] = null; // buildListAccessor(i);
+      } else {
+        readers[i] = ColumnAccessorFactory.newReader(valueVectors[i].getField().getType());
+        readers[i].bind(rowIndex, valueVectors[i]);
+      }
+    }
+    return new RowSetReaderImpl(accessSchema, rowIndex, readers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
new file mode 100644
index 0000000..706db27
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
+import org.apache.drill.exec.vector.accessor.impl.AbstractColumnWriter;
+import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory;
+import org.apache.drill.exec.vector.accessor.impl.TupleWriterImpl;
+import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
+
+/**
+ * Implementation of a single row set with no indirection (selection)
+ * vector.
+ */
+
+public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowSet {
+
+  /**
+   * Reader index that points directly to each row in the row set.
+   * This index starts with pointing to the -1st row, so that the
+   * reader can require a <tt>next()</tt> for every row, including
+   * the first. (This is the JDBC RecordSet convention.)
+   */
+
+  private static class DirectRowIndex extends BoundedRowIndex {
+
+    public DirectRowIndex(int rowCount) {
+      super(rowCount);
+    }
+
+    @Override
+    public int index() { return rowIndex; }
+
+    @Override
+    public int batch() { return 0; }
+  }
+
+  /**
+   * Writer index that points to each row in the row set. The index starts at
+   * the 0th row and advances one row on each increment. This allows writers to
+   * start positioned at the first row. Writes happen in the current row.
+   * Calling <tt>next()</tt> advances to the next position, effectively saving
+   * the current row. The most recent row can be abandoned easily simply by not
+   * calling <tt>next()</tt>. This means that the number of completed rows is
+   * the same as the row index.
+   */
+
+  private static class ExtendableRowIndex extends RowSetIndex {
+
+    private final int maxSize;
+
+    public ExtendableRowIndex(int maxSize) {
+      this.maxSize = maxSize;
+      rowIndex = 0;
+    }
+
+    @Override
+    public int index() { return rowIndex; }
+
+    @Override
+    public boolean next() {
+      if (++rowIndex <= maxSize ) {
+        return true;
+      } else {
+        rowIndex--;
+        return false;
+      }
+    }
+
+    @Override
+    public int size() { return rowIndex; }
+
+    @Override
+    public boolean valid() { return rowIndex < maxSize; }
+
+    @Override
+    public int batch() { return 0; }
+  }
+
+  /**
+   * Implementation of a row set writer. Only available for newly-created,
+   * empty, direct, single row sets. Rewriting is not allowed, nor is writing
+   * to a hyper row set.
+   */
+
+  public class RowSetWriterImpl extends TupleWriterImpl implements RowSetWriter {
+
+    private final ExtendableRowIndex index;
+    private final ExtendableRowSet rowSet;
+
+    protected RowSetWriterImpl(ExtendableRowSet rowSet, TupleSchema schema, ExtendableRowIndex index, AbstractColumnWriter[] writers) {
+      super(schema, writers);
+      this.rowSet = rowSet;
+      this.index = index;
+      start();
+    }
+
+    @Override
+    public void setRow(Object...values) {
+      if (! index.valid()) {
+        throw new IndexOutOfBoundsException("Write past end of row set");
+      }
+      for (int i = 0; i < values.length;  i++) {
+        set(i, values[i]);
+      }
+      save();
+    }
+
+    @Override
+    public boolean valid() { return index.valid(); }
+
+    @Override
+    public int index() { return index.position(); }
+
+    @Override
+    public void save() {
+      index.next();
+      start();
+    }
+
+    @Override
+    public void done() {
+      rowSet.setRowCount(index.size());
+    }
+  }
+
+  public DirectRowSet(BufferAllocator allocator, BatchSchema schema) {
+    super(allocator, schema);
+  }
+
+  public DirectRowSet(BufferAllocator allocator, VectorContainer container) {
+    super(allocator, container);
+  }
+
+  public DirectRowSet(BufferAllocator allocator, VectorAccessible va) {
+    super(allocator, toContainer(va, allocator));
+  }
+
+  private static VectorContainer toContainer(VectorAccessible va, BufferAllocator allocator) {
+    VectorContainer container = VectorContainer.getTransferClone(va, allocator);
+    container.buildSchema(SelectionVectorMode.NONE);
+    container.setRecordCount(va.getRecordCount());
+    return container;
+  }
+
+  @Override
+  public void allocate(int recordCount) {
+    for (final ValueVector v : valueVectors) {
+      AllocationHelper.allocate(v, recordCount, 50, 10);
+    }
+  }
+
+  @Override
+  public void setRowCount(int rowCount) {
+    container.setRecordCount(rowCount);
+    VectorAccessibleUtilities.setValueCount(container, rowCount);
+  }
+
+  @Override
+  public RowSetWriter writer() {
+    return writer(10);
+  }
+
+  @Override
+  public RowSetWriter writer(int initialRowCount) {
+    if (container.hasRecordCount()) {
+      throw new IllegalStateException("Row set already contains data");
+    }
+    allocate(initialRowCount);
+    return buildWriter(new ExtendableRowIndex(Character.MAX_VALUE));
+  }
+
+  /**
+   * Build writer objects for each column based on the column type.
+   *
+   * @param rowIndex the index which points to each row
+   * @return an array of writers
+   */
+
+  protected RowSetWriter buildWriter(ExtendableRowIndex rowIndex) {
+    ValueVector[] valueVectors = vectors();
+    AbstractColumnWriter[] writers = new AbstractColumnWriter[valueVectors.length];
+    for (int i = 0; i < writers.length; i++) {
+      writers[i] = ColumnAccessorFactory.newWriter(valueVectors[i].getField().getType());
+      writers[i].bind(rowIndex, valueVectors[i]);
+    }
+    TupleSchema accessSchema = schema().hierarchicalAccess();
+    return new RowSetWriterImpl(this, accessSchema, rowIndex, writers);
+  }
+
+  @Override
+  public RowSetReader reader() {
+    return buildReader(new DirectRowIndex(rowCount()));
+  }
+
+  @Override
+  public boolean isExtendable() { return true; }
+
+  @Override
+  public boolean isWritable() { return true; }
+
+  @Override
+  public SelectionVectorMode indirectionType() { return SelectionVectorMode.NONE; }
+
+  @Override
+  public SingleRowSet toIndirect() {
+    return new IndirectRowSet(this);
+  }
+
+  @Override
+  public SelectionVector2 getSv2() { return null; }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
new file mode 100644
index 0000000..9df0b23
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.HyperVectorWrapper;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.AccessorUtilities;
+import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader;
+import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader.VectorAccessor;
+import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.test.rowSet.RowSet.HyperRowSet;
+import org.apache.drill.test.rowSet.RowSetSchema.FlattenedSchema;
+import org.apache.drill.test.rowSet.RowSetSchema.LogicalColumn;
+import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
+
+/**
+ * Implements a row set wrapper around a collection of "hyper vectors."
+ * A hyper-vector is a logical vector formed by a series of physical vectors
+ * stacked on top of one another. To make a row set, we have a hyper-vector
+ * for each column. Another way to visualize this is as a "hyper row set":
+ * a stacked collection of single row sets: each column is represented by a
+ * vector per row set, with each vector in a row set having the same number
+ * of rows. An SV4 then provides a uniform index into the rows in the
+ * hyper set. A hyper row set is read-only.
+ */
+
+public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
+
+  /**
+   * Read-only row index into the hyper row set with batch and index
+   * values mapping via an SV4.
+   */
+
+  public static class HyperRowIndex extends BoundedRowIndex {
+
+    private final SelectionVector4 sv4;
+
+    public HyperRowIndex(SelectionVector4 sv4) {
+      super(sv4.getCount());
+      this.sv4 = sv4;
+    }
+
+    @Override
+    public int index() {
+      return AccessorUtilities.sv4Index(sv4.get(rowIndex));
+    }
+
+    @Override
+    public int batch( ) {
+      return AccessorUtilities.sv4Batch(sv4.get(rowIndex));
+    }
+  }
+
+  /**
+   * Vector accessor used by the column accessors to obtain the vector for
+   * each column value. That is, position 0 might be batch 4, index 3,
+   * while position 1 might be batch 1, index 7, and so on.
+   */
+
+  public static class HyperVectorAccessor implements VectorAccessor {
+
+    private final HyperRowIndex rowIndex;
+    private final ValueVector[] vectors;
+
+    public HyperVectorAccessor(HyperVectorWrapper<ValueVector> hvw, HyperRowIndex rowIndex) {
+      this.rowIndex = rowIndex;
+      vectors = hvw.getValueVectors();
+    }
+
+    @Override
+    public ValueVector vector() {
+      return vectors[rowIndex.batch()];
+    }
+  }
+
+  /**
+   * Build a hyper row set by restructuring a hyper vector bundle into a uniform
+   * shape. Consider this schema: <pre><code>
+   * { a: 10, b: { c: 20, d: { e: 30 } } }</code></pre>
+   * <p>
+   * The hyper container, with two batches, has this structure:
+   * <table border="1">
+   * <tr><th>Batch</th><th>a</th><th>b</th></tr>
+   * <tr><td>0</td><td>Int vector</td><td>Map Vector(Int vector, Map Vector(Int vector))</td></th>
+   * <tr><td>1</td><td>Int vector</td><td>Map Vector(Int vector, Map Vector(Int vector))</td></th>
+   * </table>
+   * <p>
+   * The above table shows that top-level scalar vectors (such as the Int Vector for column
+   * a) appear "end-to-end" as a hyper-vector. Maps also appear end-to-end. But, the
+   * contents of the map (column c) do not appear end-to-end. Instead, they appear as
+   * contents in the map vector. To get to c, one indexes into the map vector, steps inside
+   * the map to find c and indexes to the right row.
+   * <p>
+   * Similarly, the maps for d do not appear end-to-end, one must step to the right batch
+   * in b, then step to d.
+   * <p>
+   * Finally, to get to e, one must step
+   * into the hyper vector for b, then steps to the proper batch, steps to d, step to e
+   * and finally step to the row within e. This is a very complex, costly indexing scheme
+   * that differs depending on map nesting depth.
+   * <p>
+   * To simplify access, this class restructures the maps to flatten the scalar vectors
+   * into end-to-end hyper vectors. For example, for the above:
+   * <p>
+   * <table border="1">
+   * <tr><th>Batch</th><th>a</th><th>c</th><th>d</th></tr>
+   * <tr><td>0</td><td>Int vector</td><td>Int vector</td><td>Int vector</td></th>
+   * <tr><td>1</td><td>Int vector</td><td>Int vector</td><td>Int vector</td></th>
+   * </table>
+   *
+   * The maps are still available as hyper vectors, but separated into map fields.
+   * (Scalar access no longer needs to access the maps.) The result is a uniform
+   * addressing scheme for both top-level and nested vectors.
+   */
+
+  public static class HyperVectorBuilder {
+
+    protected final HyperVectorWrapper<?> valueVectors[];
+    protected final HyperVectorWrapper<AbstractMapVector> mapVectors[];
+    private final List<ValueVector> nestedScalars[];
+    private int vectorIndex;
+    private int mapIndex;
+    private final PhysicalSchema physicalSchema;
+
+    @SuppressWarnings("unchecked")
+    public HyperVectorBuilder(RowSetSchema schema) {
+      physicalSchema = schema.physical();
+      FlattenedSchema flatSchema = schema.flatAccess();
+      valueVectors = new HyperVectorWrapper<?>[schema.hierarchicalAccess().count()];
+      if (flatSchema.mapCount() == 0) {
+        mapVectors = null;
+        nestedScalars = null;
+      } else {
+        mapVectors = (HyperVectorWrapper<AbstractMapVector>[])
+            new HyperVectorWrapper<?>[flatSchema.mapCount()];
+        nestedScalars = new ArrayList[flatSchema.count()];
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    public HyperVectorWrapper<ValueVector>[] mapContainer(VectorContainer container) {
+      int i = 0;
+      for (VectorWrapper<?> w : container) {
+        HyperVectorWrapper<?> hvw = (HyperVectorWrapper<?>) w;
+        if (w.getField().getType().getMinorType() == MinorType.MAP) {
+          HyperVectorWrapper<AbstractMapVector> mw = (HyperVectorWrapper<AbstractMapVector>) hvw;
+          mapVectors[mapIndex++] = mw;
+          buildHyperMap(physicalSchema.column(i).mapSchema(), mw);
+        } else {
+          valueVectors[vectorIndex++] = hvw;
+        }
+        i++;
+      }
+      if (nestedScalars != null) {
+        buildNestedHyperVectors();
+      }
+      return (HyperVectorWrapper<ValueVector>[]) valueVectors;
+    }
+
+    private void buildHyperMap(PhysicalSchema mapSchema, HyperVectorWrapper<AbstractMapVector> mapWrapper) {
+      createHyperVectors(mapSchema);
+      for (AbstractMapVector mapVector : mapWrapper.getValueVectors()) {
+        buildMap(mapSchema, mapVector);
+      }
+    }
+
+    private void buildMap(PhysicalSchema mapSchema, AbstractMapVector mapVector) {
+      for (ValueVector v : mapVector) {
+        LogicalColumn col = mapSchema.column(v.getField().getName());
+        if (col.isMap()) {
+          buildMap(col.mapSchema, (AbstractMapVector) v);
+        } else {
+          nestedScalars[col.accessIndex()].add(v);
+        }
+      }
+    }
+
+    private void createHyperVectors(PhysicalSchema mapSchema) {
+      for (int i = 0; i < mapSchema.count(); i++) {
+        LogicalColumn col = mapSchema.column(i);
+        if (col.isMap()) {
+          createHyperVectors(col.mapSchema);
+        } else {
+          nestedScalars[col.accessIndex()] = new ArrayList<ValueVector>();
+        }
+      }
+    }
+
+    private void buildNestedHyperVectors() {
+      for (int i = 0;  i < nestedScalars.length; i++) {
+        if (nestedScalars[i] == null) {
+          continue;
+        }
+        ValueVector vectors[] = new ValueVector[nestedScalars[i].size()];
+        nestedScalars[i].toArray(vectors);
+        assert valueVectors[i] == null;
+        valueVectors[i] = new HyperVectorWrapper<ValueVector>(vectors[0].getField(), vectors, false);
+      }
+    }
+  }
+
+  /**
+   * Selection vector that indexes into the hyper vectors.
+   */
+  private final SelectionVector4 sv4;
+  /**
+   * Collection of hyper vectors in flattened order: a left-to-right,
+   * depth first ordering of vectors in maps. Order here corresponds to
+   * the order used for column indexes in the row set reader.
+   */
+  private final HyperVectorWrapper<ValueVector> hvw[];
+
+  public HyperRowSetImpl(BufferAllocator allocator, VectorContainer container, SelectionVector4 sv4) {
+    super(allocator, container.getSchema(), container);
+    this.sv4 = sv4;
+    hvw = new HyperVectorBuilder(schema).mapContainer(container);
+  }
+
+  @Override
+  public boolean isExtendable() { return false; }
+
+  @Override
+  public boolean isWritable() { return false; }
+
+  @Override
+  public RowSetWriter writer() {
+    throw new UnsupportedOperationException("Cannot write to a hyper vector");
+  }
+
+  @Override
+  public RowSetReader reader() {
+    return buildReader(new HyperRowIndex(sv4));
+  }
+
+  /**
+   * Internal method to build the set of column readers needed for
+   * this row set. Used when building a row set reader.
+   * @param rowIndex object that points to the current row
+   * @return an array of column readers: in the same order as the
+   * (non-map) vectors.
+   */
+
+  protected RowSetReader buildReader(HyperRowIndex rowIndex) {
+    FlattenedSchema accessSchema = schema().flatAccess();
+    AbstractColumnReader readers[] = new AbstractColumnReader[accessSchema.count()];
+    for (int i = 0; i < readers.length; i++) {
+      MaterializedField field = accessSchema.column(i);
+      readers[i] = ColumnAccessorFactory.newReader(field.getType());
+      HyperVectorWrapper<ValueVector> hvw = getHyperVector(i);
+      readers[i].bind(rowIndex, field, new HyperVectorAccessor(hvw, rowIndex));
+    }
+    return new RowSetReaderImpl(accessSchema, rowIndex, readers);
+  }
+
+  @Override
+  public SelectionVectorMode indirectionType() { return SelectionVectorMode.FOUR_BYTE; }
+
+  @Override
+  public SelectionVector4 getSv4() { return sv4; }
+
+  @Override
+  public HyperVectorWrapper<ValueVector> getHyperVector(int i) { return hvw[i]; }
+
+  @Override
+  public int rowCount() { return sv4.getCount(); }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
new file mode 100644
index 0000000..f90fbb7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+/**
+ * Single row set coupled with an indirection (selection) vector,
+ * specifically an SV2.
+ */
+
+public class IndirectRowSet extends AbstractSingleRowSet {
+
+  /**
+   * Reader index that points to each row indirectly through the
+   * selection vector. The {@link #index()} method points to the
+   * actual data row, while the {@link #position()} method gives
+   * the position relative to the indirection vector. That is,
+   * the position increases monotonically, but the index jumps
+   * around as specified by the indirection vector.
+   */
+
+  private static class IndirectRowIndex extends BoundedRowIndex {
+
+    private final SelectionVector2 sv2;
+
+    public IndirectRowIndex(SelectionVector2 sv2) {
+      super(sv2.getCount());
+      this.sv2 = sv2;
+    }
+
+    @Override
+    public int index() { return sv2.getIndex(rowIndex); }
+
+    @Override
+    public int batch() { return 0; }
+  }
+
+  private final SelectionVector2 sv2;
+
+  public IndirectRowSet(BufferAllocator allocator, VectorContainer container) {
+    this(allocator, container, makeSv2(allocator, container));
+  }
+
+  public IndirectRowSet(BufferAllocator allocator, VectorContainer container, SelectionVector2 sv2) {
+    super(allocator, container);
+    this.sv2 = sv2;
+  }
+
+  private static SelectionVector2 makeSv2(BufferAllocator allocator, VectorContainer container) {
+    int rowCount = container.getRecordCount();
+    SelectionVector2 sv2 = new SelectionVector2(allocator);
+    if (!sv2.allocateNewSafe(rowCount)) {
+      throw new OutOfMemoryException("Unable to allocate sv2 buffer");
+    }
+    for (int i = 0; i < rowCount; i++) {
+      sv2.setIndex(i, (char) i);
+    }
+    sv2.setRecordCount(rowCount);
+    container.buildSchema(SelectionVectorMode.TWO_BYTE);
+    return sv2;
+  }
+
+  public IndirectRowSet(DirectRowSet directRowSet) {
+    super(directRowSet);
+    sv2 = makeSv2(allocator, container);
+  }
+
+  @Override
+  public SelectionVector2 getSv2() { return sv2; }
+
+  @Override
+  public void clear() {
+    super.clear();
+    getSv2().clear();
+  }
+
+  @Override
+  public RowSetWriter writer() {
+    throw new UnsupportedOperationException("Cannot write to an existing row set");
+  }
+
+  @Override
+  public RowSetReader reader() {
+    return buildReader(new IndirectRowIndex(getSv2()));
+  }
+
+  @Override
+  public boolean isExtendable() {return false;}
+
+  @Override
+  public boolean isWritable() { return true;}
+
+  @Override
+  public SelectionVectorMode indirectionType() { return SelectionVectorMode.TWO_BYTE; }
+
+  @Override
+  public SingleRowSet toIndirect() { return this; }
+
+  @Override
+  public int size() {
+    RecordBatchSizer sizer = new RecordBatchSizer(container, sv2);
+    return sizer.actualSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
new file mode 100644
index 0000000..d22139c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.HyperVectorWrapper;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.TupleReader;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+/**
+ * A row set is a collection of rows stored as value vectors. Elsewhere in
+ * Drill we call this a "record batch", but that term has been overloaded to
+ * mean the runtime implementation of an operator...
+ * <p>
+ * A row set encapsulates a set of vectors and provides access to Drill's
+ * various "views" of vectors: {@link VectorContainer},
+ * {@link VectorAccessible}, etc.
+ * <p>
+ * A row set is defined by a {@link RowSetSchema}. For testing purposes, a row
+ * set has a fixed schema; we don't allow changing the set of vectors
+ * dynamically.
+ * <p>
+ * The row set also provides a simple way to write and read records using the
+ * {@link RowSetWriter} and {@link RowSetReader} interfaces. As per Drill
+ * conventions, a row set can be written (once), read many times, and finally
+ * cleared.
+ * <p>
+ * Drill provides a large number of vector (data) types. Each requires a
+ * type-specific way to set data. The row set writer uses a {@link ColumnWriter}
+ * to set each value in a way unique to the specific data type. Similarly, the
+ * row set reader provides a {@link ColumnReader} interface. In both cases,
+ * columns can be accessed by index number (as defined in the schema) or
+ * by name.
+ * <p>
+ * A row set follows a schema. The schema starts as a
+ * {@link BatchSchema}, but is parsed and restructured into a variety of
+ * forms. In the original form, maps contain their value vectors. In the
+ * flattened form, all vectors for all maps (and the top-level tuple) are
+ * collected into a single structure. Since this structure is for testing,
+ * this somewhat-static structure works just file; we don't need the added
+ * complexity that comes from building the schema and data dynamically.
+ * <p>
+ * Putting this all together, the typical life-cycle flow is:
+ * <ul>
+ * <li>Define the schema using {@link RowSetSchema#builder()}.</li>
+ * <li>Create the row set from the schema.</li>
+ * <li>Populate the row set using a writer from {@link #writer(int)}.</li>
+ * <li>Optionally add a selection vector: {@link #makeSv2()}.</li>
+ * <li>Process the vector container using the code under test.</li>
+ * <li>Retrieve the results using a reader from {@link #reader()}.</li>
+ * <li>Dispose of vector memory with {@link #clear()}.</li>
+ * </ul>
+ */
+
+public interface RowSet {
+
+  /**
+   * Interface for writing values to a row set. Only available
+   * for newly-created, single, direct row sets. Eventually, if
+   * we want to allow updating a row set, we have to create a
+   * new row set with the updated columns, then merge the new
+   * and old row sets to create a new immutable row set.
+   */
+
+  public interface RowSetWriter extends TupleWriter {
+    void setRow(Object...values);
+    boolean valid();
+    int index();
+    void save();
+    void done();
+  }
+
+  /**
+   * Reader for all types of row sets.
+   */
+
+  public interface RowSetReader extends TupleReader {
+
+    /**
+     * Total number of rows in the row set.
+     * @return total number of rows
+     */
+    int size();
+
+    boolean next();
+    int index();
+    void set(int index);
+
+    /**
+     * Batch index: 0 for a single batch, batch for the current
+     * row is a hyper-batch.
+     * @return index of the batch for the current row
+     */
+    int batchIndex();
+
+    /**
+     * The index of the underlying row which may be indexed by an
+     * Sv2 or Sv4.
+     *
+     * @return
+     */
+
+    int rowIndex();
+    boolean valid();
+  }
+
+  boolean isExtendable();
+
+  boolean isWritable();
+
+  VectorAccessible vectorAccessible();
+
+  VectorContainer container();
+
+  int rowCount();
+
+  RowSetWriter writer();
+
+  RowSetReader reader();
+
+  void clear();
+
+  RowSetSchema schema();
+
+  BufferAllocator allocator();
+
+  SelectionVectorMode indirectionType();
+
+  void print();
+
+  /**
+   * Return the size in memory of this record set, including indirection
+   * vectors, null vectors, offset vectors and the entire (used and unused)
+   * data vectors.
+   *
+   * @return memory size in bytes
+   */
+
+  int size();
+
+  BatchSchema batchSchema();
+
+  /**
+   * Row set that manages a single batch of rows.
+   */
+
+  public interface SingleRowSet extends RowSet {
+    ValueVector[] vectors();
+    SingleRowSet toIndirect();
+    SelectionVector2 getSv2();
+  }
+
+  /**
+   * Single row set which is empty and allows writing.
+   * Once writing is complete, the row set becomes an
+   * immutable direct row set.
+   */
+
+  public interface ExtendableRowSet extends SingleRowSet {
+    void allocate(int recordCount);
+    void setRowCount(int rowCount);
+    RowSetWriter writer(int initialRowCount);
+  }
+
+  /**
+   * Row set comprised of multiple single row sets, along with
+   * an indirection vector (SV4).
+   */
+
+  public interface HyperRowSet extends RowSet {
+    SelectionVector4 getSv4();
+    HyperVectorWrapper<ValueVector> getHyperVector(int i);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
new file mode 100644
index 0000000..a5b03c8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+
+/**
+ * Fluent builder to quickly build up an row set (record batch)
+ * programmatically. Starting with an {@link OperatorFixture}:
+ * <pre></code>
+ * OperatorFixture fixture = ...
+ * RowSet rowSet = fixture.rowSetBuilder(batchSchema)
+ *   .addRow(10, "string", new int[] {10.3, 10.4})
+ *   ...
+ *   .build();</code></pre>
+ */
+
+public final class RowSetBuilder {
+
+  private DirectRowSet rowSet;
+  private RowSetWriter writer;
+  private boolean withSv2;
+
+  public RowSetBuilder(BufferAllocator allocator, BatchSchema schema) {
+    this(allocator, schema, 10);
+  }
+
+  public RowSetBuilder(BufferAllocator allocator, BatchSchema schema, int capacity) {
+    rowSet = new DirectRowSet(allocator, schema);
+    writer = rowSet.writer(capacity);
+  }
+
+  /**
+   * Add a new row using column values passed as variable-length arguments. Expects
+   * map values to be flattened. a schema of (a:int, b:map(c:varchar)) would be>
+   * set as <br><tt>add(10, "foo");</tt><br> Values of arrays can be expressed as a Java
+   * array. A schema of (a:int, b:int[]) can be set as<br>
+   * <tt>add(10, new int[] {100, 200});</tt><br>
+   * @param values column values in column index order
+   * @return this builder
+   */
+
+  public RowSetBuilder add(Object...values) {
+    writer.setRow(values);
+    return this;
+  }
+
+  /**
+   * Build the row set with a selection vector 2. The SV2 is
+   * initialized to have a 1:1 index to the rows: SV2 0 points
+   * to row 1, SV2 position 1 points to row 1 and so on.
+   *
+   * @return this builder
+   */
+  public RowSetBuilder withSv2() {
+    withSv2 = true;
+    return this;
+  }
+
+  public SingleRowSet build() {
+    writer.done();
+    if (withSv2) {
+      return rowSet.toIndirect();
+    }
+    return rowSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
new file mode 100644
index 0000000..3ba7471
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.exec.vector.accessor.ArrayReader;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
+import org.apache.drill.test.rowSet.RowSet.RowSetReader;
+import org.bouncycastle.util.Arrays;
+
+/**
+ * For testing, compare the contents of two row sets (record batches)
+ * to verify that they are identical. Supports masks to exclude certain
+ * columns from comparison.
+ */
+
+public class RowSetComparison {
+
+  private RowSet expected;
+  private boolean mask[];
+  private double delta = 0.001;
+  private int offset;
+  private int span = -1;
+
+  public RowSetComparison(RowSet expected) {
+    this.expected = expected;
+    mask = new boolean[expected.schema().hierarchicalAccess().count()];
+    for (int i = 0; i < mask.length; i++) {
+      mask[i] = true;
+    }
+  }
+
+  /**
+   * Mark a specific column as excluded from comparisons.
+   * @param colNo the index of the column to exclude
+   * @return this builder
+   */
+
+  public RowSetComparison exclude(int colNo) {
+    mask[colNo] = false;
+    return this;
+  }
+
+  /**
+   * Specifies a "selection" mask that determines which columns
+   * to compare. Columns marked as "false" are omitted from the
+   * comparison.
+   *
+   * @param flags variable-length list of column flags
+   * @return this builder
+   */
+  public RowSetComparison withMask(Boolean...flags) {
+    for (int i = 0; i < flags.length; i++) {
+      mask[i] = flags[i];
+    }
+    return this;
+  }
+
+  /**
+   * Specify the delta value to use when comparing float or
+   * double values.
+   *
+   * @param delta the delta to use in float and double comparisons
+   * @return this builder
+   */
+  public RowSetComparison withDelta(double delta) {
+    this.delta = delta;
+    return this;
+  }
+
+  /**
+   * Specify an offset into the row sets to start the comparison.
+   * Usually combined with {@link #span()}.
+   *
+   * @param offset offset into the row set to start the comparison
+   * @return this builder
+   */
+  public RowSetComparison offset(int offset) {
+    this.offset = offset;
+    return this;
+  }
+
+  /**
+   * Specify a subset of rows to compare. Usually combined
+   * with {@link #offset()}.
+   *
+   * @param span the number of rows to compare
+   * @return this builder
+   */
+
+  public RowSetComparison span(int span) {
+    this.span = span;
+    return this;
+  }
+
+  /**
+   * Verify the actual rows using the rules defined in this builder
+   * @param actual the actual results to verify
+   */
+
+  public void verify(RowSet actual) {
+    int testLength = expected.rowCount() - offset;
+    if (span > -1) {
+      testLength = span;
+    }
+    int dataLength = offset + testLength;
+    assertTrue("Missing expected rows", expected.rowCount() >= dataLength);
+    assertTrue("Missing actual rows", actual.rowCount() >= dataLength);
+    RowSetReader er = expected.reader();
+    RowSetReader ar = actual.reader();
+    for (int i = 0; i < offset; i++) {
+      er.next();
+      ar.next();
+    }
+    for (int i = 0; i < testLength; i++) {
+      er.next();
+      ar.next();
+      verifyRow(er, ar);
+    }
+  }
+
+  /**
+   * Convenience method to verify the actual results, then free memory
+   * for both the expected and actual result sets.
+   * @param actual the actual results to verify
+   */
+
+  public void verifyAndClear(RowSet actual) {
+    try {
+      verify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  private void verifyRow(RowSetReader er, RowSetReader ar) {
+    for (int i = 0; i < mask.length; i++) {
+      if (! mask[i]) {
+        continue;
+      }
+      ColumnReader ec = er.column(i);
+      ColumnReader ac = ar.column(i);
+      String label = er.index() + ":" + i;
+      assertEquals(label, ec.valueType(), ac.valueType());
+      if (ec.isNull()) {
+        assertTrue(label + " - column not null", ac.isNull());
+        continue;
+      }
+      if (! ec.isNull()) {
+        assertTrue(label + " - column is null", ! ac.isNull());
+      }
+    switch (ec.valueType()) {
+    case BYTES: {
+        byte expected[] = ac.getBytes();
+        byte actual[] = ac.getBytes();
+        assertEquals(label + " - byte lengths differ", expected.length, actual.length);
+        assertTrue(label, Arrays.areEqual(expected, actual));
+        break;
+     }
+     case DOUBLE:
+       assertEquals(label, ec.getDouble(), ac.getDouble(), delta);
+       break;
+     case INTEGER:
+       assertEquals(label, ec.getInt(), ac.getInt());
+       break;
+     case LONG:
+       assertEquals(label, ec.getLong(), ac.getLong());
+       break;
+     case STRING:
+       assertEquals(label, ec.getString(), ac.getString());
+        break;
+     case DECIMAL:
+       assertEquals(label, ec.getDecimal(), ac.getDecimal());
+       break;
+     case PERIOD:
+       assertEquals(label, ec.getPeriod(), ac.getPeriod());
+       break;
+     case ARRAY:
+       verifyArray(label, ec.array(), ac.array());
+       break;
+     default:
+        throw new IllegalStateException( "Unexpected type: " + ec.valueType());
+      }
+    }
+  }
+
+  private void verifyArray(String colLabel, ArrayReader ea,
+      ArrayReader aa) {
+    assertEquals(colLabel, ea.valueType(), aa.valueType());
+    assertEquals(colLabel, ea.size(), aa.size());
+    for (int i = 0; i < ea.size(); i++) {
+      String label = colLabel + "[" + i + "]";
+      switch (ea.valueType()) {
+      case ARRAY:
+        throw new IllegalStateException("Arrays of arrays not supported yet");
+      case BYTES: {
+        byte expected[] = ea.getBytes(i);
+        byte actual[] = aa.getBytes(i);
+        assertEquals(label + " - byte lengths differ", expected.length, actual.length);
+        assertTrue(label, Arrays.areEqual(expected, actual));
+        break;
+      }
+      case DOUBLE:
+        assertEquals(label, ea.getDouble(i), aa.getDouble(i), delta);
+        break;
+      case INTEGER:
+        assertEquals(label, ea.getInt(i), aa.getInt(i));
+        break;
+      case LONG:
+        assertEquals(label, ea.getLong(i), aa.getLong(i));
+        break;
+      case STRING:
+        assertEquals(label, ea.getString(i), aa.getString(i));
+        break;
+      case DECIMAL:
+        assertEquals(label, ea.getDecimal(i), aa.getDecimal(i));
+        break;
+      case PERIOD:
+        assertEquals(label, ea.getPeriod(i), aa.getPeriod(i));
+        break;
+      default:
+        throw new IllegalStateException( "Unexpected type: " + ea.valueType());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
new file mode 100644
index 0000000..601abb1
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import java.io.PrintStream;
+
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
+import org.apache.drill.test.rowSet.RowSet.RowSetReader;
+
+/**
+ * Print a row set in CSV-like format. Primarily for debugging.
+ */
+
+public class RowSetPrinter {
+  private RowSet rowSet;
+
+  public RowSetPrinter(RowSet rowSet) {
+    this.rowSet = rowSet;
+  }
+
+  public void print() {
+    print(System.out);
+  }
+
+  public void print(PrintStream out) {
+    SelectionVectorMode selectionMode = rowSet.indirectionType();
+    RowSetReader reader = rowSet.reader();
+    int colCount = reader.schema().count();
+    printSchema(out, selectionMode);
+    while (reader.next()) {
+      printHeader(out, reader, selectionMode);
+      for (int i = 0; i < colCount; i++) {
+        if (i > 0) {
+          out.print(", ");
+        }
+        out.print(reader.getAsString(i));
+      }
+      out.println();
+    }
+  }
+
+  private void printSchema(PrintStream out, SelectionVectorMode selectionMode) {
+    out.print("#");
+    switch (selectionMode) {
+    case FOUR_BYTE:
+      out.print(" (batch #, row #)");
+      break;
+    case TWO_BYTE:
+      out.print(" (row #)");
+      break;
+    default:
+      break;
+    }
+    out.print(": ");
+    TupleSchema schema = rowSet.schema().hierarchicalAccess();
+    for (int i = 0; i < schema.count(); i++) {
+      if (i > 0) {
+        out.print(", ");
+      }
+      out.print(schema.column(i).getLastName());
+    }
+    out.println();
+  }
+
+  private void printHeader(PrintStream out, RowSetReader reader, SelectionVectorMode selectionMode) {
+    out.print(reader.index());
+    switch (selectionMode) {
+    case FOUR_BYTE:
+      out.print(" (");
+      out.print(reader.batchIndex());
+      out.print(", ");
+      out.print(reader.rowIndex());
+      out.print(")");
+      break;
+    case TWO_BYTE:
+      out.print(" (");
+      out.print(reader.rowIndex());
+      out.print(")");
+      break;
+    default:
+      break;
+    }
+    out.print(": ");
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java
new file mode 100644
index 0000000..55b5f12
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
+import org.apache.drill.exec.record.MaterializedField;
+
+/**
+ * Row set schema presented as a number of distinct "views" for various
+ * purposes:
+ * <ul>
+ * <li>Batch schema: the schema used by a VectorContainer.</li>
+ * <li>Physical schema: the schema expressed as a hierarchy of
+ * tuples with the top tuple representing the row, nested tuples
+ * representing maps.</li>
+ * <li>Access schema: a flattened schema with all scalar columns
+ * at the top level, and with map columns pulled out into a separate
+ * collection. The flattened-scalar view is the one used to write to,
+ * and read from, the row set.</li>
+ * </ul>
+ * Allows easy creation of multiple row sets from the same schema.
+ * Each schema is immutable, which is fine for tests in which we
+ * want known inputs and outputs.
+ */
+
+public class RowSetSchema {
+
+  /**
+   * Logical description of a column. A logical column is a
+   * materialized field. For maps, also includes a logical schema
+   * of the map.
+   */
+
+  public static class LogicalColumn {
+    protected final String fullName;
+    protected final int accessIndex;
+    protected int flatIndex;
+    protected final MaterializedField field;
+
+    /**
+     * Schema of the map. Includes only those fields directly within
+     * the map; does not include fields from nested tuples.
+     */
+
+    protected PhysicalSchema mapSchema;
+
+    public LogicalColumn(String fullName, int accessIndex, MaterializedField field) {
+      this.fullName = fullName;
+      this.accessIndex = accessIndex;
+      this.field = field;
+    }
+
+    private void updateStructure(int index, PhysicalSchema children) {
+      flatIndex = index;
+      mapSchema = children;
+    }
+
+    public int accessIndex() { return accessIndex; }
+    public int flatIndex() { return flatIndex; }
+    public boolean isMap() { return mapSchema != null; }
+    public PhysicalSchema mapSchema() { return mapSchema; }
+    public MaterializedField field() { return field; }
+    public String fullName() { return fullName; }
+  }
+
+  /**
+   * Implementation of a tuple name space. Tuples allow both indexed and
+   * named access to their members.
+   *
+   * @param <T> the type of object representing each column
+   */
+
+  public static class NameSpace<T> {
+    private final Map<String,Integer> nameSpace = new HashMap<>();
+    private final List<T> columns = new ArrayList<>();
+
+    public int add(String key, T value) {
+      int index = columns.size();
+      nameSpace.put(key, index);
+      columns.add(value);
+      return index;
+    }
+
+    public T get(int index) {
+      return columns.get(index);
+    }
+
+    public T get(String key) {
+      int index = getIndex(key);
+      if (index == -1) {
+        return null;
+      }
+      return get(index);
+    }
+
+    public int getIndex(String key) {
+      Integer index = nameSpace.get(key);
+      if (index == null) {
+        return -1;
+      }
+      return index;
+    }
+
+    public int count() { return columns.size(); }
+  }
+
+  /**
+   * Provides a non-flattened, physical view of the schema. The top-level
+   * row includes maps, maps expand to a nested tuple schema. This view
+   * corresponds, more-or-less, to the physical storage of vectors in
+   * a vector accessible or vector container.
+   */
+
+  private static class TupleSchemaImpl implements TupleSchema {
+
+    private NameSpace<LogicalColumn> columns;
+
+    public TupleSchemaImpl(NameSpace<LogicalColumn> ns) {
+      this.columns = ns;
+    }
+
+    @Override
+    public MaterializedField column(int index) {
+      return logicalColumn(index).field();
+    }
+
+    public LogicalColumn logicalColumn(int index) { return columns.get(index); }
+
+    @Override
+    public MaterializedField column(String name) {
+      LogicalColumn col = columns.get(name);
+      return col == null ? null : col.field();
+    }
+
+    @Override
+    public int columnIndex(String name) {
+      return columns.getIndex(name);
+    }
+
+    @Override
+    public int count() { return columns.count(); }
+  }
+
+  /**
+   * Represents the flattened view of the schema used to get and set columns.
+   * Represents a left-to-right, depth-first traversal of the row and map
+   * columns. Holds only materialized vectors (non-maps). For completeness,
+   * provides access to maps also via separate methods, but this is generally
+   * of little use.
+   */
+
+  public static class FlattenedSchema extends TupleSchemaImpl {
+    protected final TupleSchemaImpl maps;
+
+    public FlattenedSchema(NameSpace<LogicalColumn> cols, NameSpace<LogicalColumn> maps) {
+      super(cols);
+      this.maps = new TupleSchemaImpl(maps);
+    }
+
+    public LogicalColumn logicalMap(int index) { return maps.logicalColumn(index); }
+    public MaterializedField map(int index) { return maps.column(index); }
+    public MaterializedField map(String name) { return maps.column(name); }
+    public int mapIndex(String name) { return maps.columnIndex(name); }
+    public int mapCount() { return maps.count(); }
+  }
+
+  /**
+   * Physical schema of a row set showing the logical hierarchy of fields
+   * with map fields as first-class fields. Map members appear as children
+   * under the map, much as they appear in the physical value-vector
+   * implementation.
+   */
+
+  public static class PhysicalSchema {
+    protected final NameSpace<LogicalColumn> schema = new NameSpace<>();
+
+    public LogicalColumn column(int index) {
+      return schema.get(index);
+    }
+
+    public LogicalColumn column(String name) {
+      return schema.get(name);
+    }
+
+    public int count() { return schema.count(); }
+
+    public NameSpace<LogicalColumn> nameSpace() { return schema; }
+  }
+
+  private static class SchemaExpander {
+    private final PhysicalSchema physicalSchema;
+    private final NameSpace<LogicalColumn> cols = new NameSpace<>();
+    private final NameSpace<LogicalColumn> maps = new NameSpace<>();
+
+    public SchemaExpander(BatchSchema schema) {
+      physicalSchema = expand("", schema);
+    }
+
+    private PhysicalSchema expand(String prefix, Iterable<MaterializedField> fields) {
+      PhysicalSchema physical = new PhysicalSchema();
+      for (MaterializedField field : fields) {
+        String name = prefix + field.getName();
+        int index;
+        LogicalColumn colSchema = new LogicalColumn(name, physical.count(), field);
+        physical.schema.add(field.getName(), colSchema);
+        PhysicalSchema children = null;
+        if (field.getType().getMinorType() == MinorType.MAP) {
+          index = maps.add(name, colSchema);
+          children = expand(name + ".", field.getChildren());
+        } else {
+          index = cols.add(name, colSchema);
+        }
+        colSchema.updateStructure(index, children);
+      }
+      return physical;
+    }
+  }
+
+  private final BatchSchema batchSchema;
+  private final TupleSchemaImpl accessSchema;
+  private final FlattenedSchema flatSchema;
+  private final PhysicalSchema physicalSchema;
+
+  public RowSetSchema(BatchSchema schema) {
+    batchSchema = schema;
+    SchemaExpander expander = new SchemaExpander(schema);
+    physicalSchema = expander.physicalSchema;
+    accessSchema = new TupleSchemaImpl(physicalSchema.nameSpace());
+    flatSchema = new FlattenedSchema(expander.cols, expander.maps);
+  }
+
+  /**
+   * A hierarchical schema that includes maps, with maps expanding
+   * to a nested tuple schema. Not used at present; this is intended
+   * to be the bases of non-flattened accessors if we find the need.
+   * @return the hierarchical access schema
+   */
+
+  public TupleSchema hierarchicalAccess() { return accessSchema; }
+
+  /**
+   * A flattened (left-to-right, depth-first traversal) of the non-map
+   * columns in the row. Used to define the column indexes in the
+   * get methods for row readers and the set methods for row writers.
+   * @return the flattened access schema
+   */
+
+  public FlattenedSchema flatAccess() { return flatSchema; }
+
+  /**
+   * Internal physical schema in hierarchical order. Mostly used to create
+   * the other schemas, but may be of use in special cases. Has the same
+   * structure as the batch schema, but with additional information.
+   * @return a tree-structured physical schema
+   */
+
+  public PhysicalSchema physical() { return physicalSchema; }
+
+  /**
+   * The batch schema used by the Drill runtime. Represents a tree-structured
+   * list of top-level fields, including maps. Maps contain a nested schema.
+   * @return the batch schema used by the Drill runtime
+   */
+
+  public BatchSchema batch() { return batchSchema; }
+
+  /**
+   * Convert this schema to a new batch schema that includes the specified
+   * selection vector mode.
+   * @param svMode selection vector mode for the new schema
+   * @return the new batch schema
+   */
+
+  public BatchSchema toBatchSchema(SelectionVectorMode svMode) {
+    List<MaterializedField> fields = new ArrayList<>();
+    for (MaterializedField field : batchSchema) {
+      fields.add(field);
+    }
+    return new BatchSchema(svMode, fields);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
new file mode 100644
index 0000000..261a9c1
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.accessor.AccessorUtilities;
+import org.apache.drill.exec.vector.accessor.ColumnAccessor.ValueType;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+/**
+ * Various utilities useful for working with row sets, especially for testing.
+ */
+
+public class RowSetUtilities {
+
+  private RowSetUtilities() { }
+
+  /**
+   * Reverse a row set by reversing the entries in an SV2. This is a quick
+   * and easy way to reverse the sort order of an expected-value row set.
+   * @param sv2 the SV2 which is reversed in place
+   */
+
+  public static void reverse(SelectionVector2 sv2) {
+    int count = sv2.getCount();
+    for (int i = 0; i < count / 2; i++) {
+      char temp = sv2.getIndex(i);
+      int dest = count - 1 - i;
+      sv2.setIndex(i, sv2.getIndex(dest));
+      sv2.setIndex(dest, temp);
+    }
+  }
+
+  /**
+   * Set a test data value from an int. Uses the type information of the
+   * column to handle interval types. Else, uses the value type of the
+   * accessor. The value set here is purely for testing; the mapping
+   * from ints to intervals has no real meaning.
+   *
+   * @param rowWriter
+   * @param index
+   * @param value
+   */
+
+  public static void setFromInt(RowSetWriter rowWriter, int index, int value) {
+    ColumnWriter writer = rowWriter.column(index);
+    if (writer.valueType() == ValueType.PERIOD) {
+      setPeriodFromInt(writer, rowWriter.schema().column(index).getType().getMinorType(), value);
+    } else {
+      AccessorUtilities.setFromInt(writer, value);
+    }
+  }
+
+  /**
+   * Ad-hoc, test-only method to set a Period from an integer. Periods are made up of
+   * months and millseconds. There is no mapping from one to the other, so a period
+   * requires at least two number. Still, we are given just one (typically from a test
+   * data generator.) Use that int value to "spread" some value across the two kinds
+   * of fields. The result has no meaning, but has the same comparison order as the
+   * original ints.
+   *
+   * @param writer column writer for a period column
+   * @param minorType the Drill data type
+   * @param value the integer value to apply
+   */
+
+  public static void setPeriodFromInt(ColumnWriter writer, MinorType minorType,
+      int value) {
+    switch (minorType) {
+    case INTERVAL:
+      writer.setPeriod(Duration.millis(value).toPeriod());
+      break;
+    case INTERVALYEAR:
+      writer.setPeriod(Period.years(value / 12).withMonths(value % 12));
+      break;
+    case INTERVALDAY:
+      int sec = value % 60;
+      value = value / 60;
+      int min = value % 60;
+      value = value / 60;
+      writer.setPeriod(Period.days(value).withMinutes(min).withSeconds(sec));
+      break;
+    default:
+      throw new IllegalArgumentException("Writer is not an interval: " + minorType);
+    }
+  }
+}