You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2016/12/04 19:21:13 UTC

[1/5] drill git commit: DRILL-4604: Generate warning on Web UI if drillbits version mismatch is detected

Repository: drill
Updated Branches:
  refs/heads/master bcb301c3e -> 68bd27a12


DRILL-4604: Generate warning on Web UI if drillbits version mismatch is detected

close apache/drill#482


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/63ffeff8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/63ffeff8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/63ffeff8

Branch: refs/heads/master
Commit: 63ffeff889e59bfc9ef8d352f200c1e76a92e17a
Parents: bcb301c
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Thu Apr 14 16:47:15 2016 +0000
Committer: Aman Sinha <as...@maprtech.com>
Committed: Fri Dec 2 08:33:06 2016 -0800

----------------------------------------------------------------------
 .../exec/coord/zk/ZKClusterCoordinator.java     |  14 +-
 .../drill/exec/server/rest/DrillRoot.java       | 148 +++++++++++----
 .../drill/exec/service/ServiceEngine.java       |   2 +
 .../drill/exec/store/sys/DrillbitIterator.java  |   2 +
 .../java-exec/src/main/resources/rest/index.ftl |  63 +++++--
 .../work/metadata/TestMetadataProvider.java     |   2 +-
 .../drill/exec/proto/CoordinationProtos.java    | 183 +++++++++++++++++--
 .../exec/proto/SchemaCoordinationProtos.java    |   7 +
 .../exec/proto/beans/DrillbitEndpoint.java      |  22 +++
 protocol/src/main/protobuf/Coordination.proto   |   1 +
 10 files changed, 376 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/63ffeff8/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
index 51f75c5..b14a151 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
@@ -243,15 +243,13 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
         builder.append("Active drillbit set changed.  Now includes ");
         builder.append(newDrillbitSet.size());
         builder.append(" total bits. New active drillbits:\n");
+        builder.append("Address | User Port | Control Port | Data Port | Version |\n");
         for (DrillbitEndpoint bit: newDrillbitSet) {
-          builder.append('\t');
-          builder.append(bit.getAddress());
-          builder.append(':');
-          builder.append(bit.getUserPort());
-          builder.append(':');
-          builder.append(bit.getControlPort());
-          builder.append(':');
-          builder.append(bit.getDataPort());
+          builder.append(bit.getAddress()).append(" | ");
+          builder.append(bit.getUserPort()).append(" | ");
+          builder.append(bit.getControlPort()).append(" | ");
+          builder.append(bit.getDataPort()).append(" | ");
+          builder.append(bit.getVersion()).append(" |");
           builder.append('\n');
         }
         logger.debug(builder.toString());

http://git-wip-us.apache.org/repos/asf/drill/blob/63ffeff8/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
index d1513fc..ba0f212 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
@@ -17,8 +17,7 @@
  */
 package org.apache.drill.exec.server.rest;
 
-import java.util.List;
-
+import java.util.Collection;
 import javax.annotation.security.PermitAll;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
@@ -28,14 +27,14 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.SecurityContext;
 import javax.xml.bind.annotation.XmlRootElement;
 
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.proto.CoordinationProtos;
+import com.google.common.base.Strings;
+import com.google.common.collect.Sets;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
 import org.apache.drill.exec.work.WorkManager;
 import org.glassfish.jersey.server.mvc.Viewable;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.google.common.collect.Lists;
 
 @Path("/")
 @PermitAll
@@ -48,51 +47,130 @@ public class DrillRoot {
 
   @GET
   @Produces(MediaType.TEXT_HTML)
-  public Viewable getStats() {
-    return ViewableWithPermissions.create(authEnabled.get(), "/rest/index.ftl", sc, getStatsJSON());
+  public Viewable getClusterInfo() {
+    return ViewableWithPermissions.create(authEnabled.get(), "/rest/index.ftl", sc, getClusterInfoJSON());
   }
 
   @GET
-  @Path("/stats.json")
+  @Path("/cluster.json")
   @Produces(MediaType.APPLICATION_JSON)
-  public List<Stat> getStatsJSON() {
-    List<Stat> stats = Lists.newLinkedList();
-    stats.add(new Stat("Number of Drill Bits", work.getContext().getBits().size()));
-    int number = 0;
-    for (CoordinationProtos.DrillbitEndpoint bit : work.getContext().getBits()) {
-      String initialized = bit.isInitialized() ? " initialized" : " not initialized";
-      stats.add(new Stat("Bit #" + number, bit.getAddress() + initialized));
-      ++number;
+  public ClusterInfo getClusterInfoJSON() {
+    final Collection<DrillbitInfo> drillbits = Sets.newTreeSet();
+    final Collection<String> mismatchedVersions = Sets.newTreeSet();
+
+    final DrillbitEndpoint currentDrillbit = work.getContext().getEndpoint();
+    final String currentVersion = currentDrillbit.getVersion();
+
+    for (DrillbitEndpoint endpoint : work.getContext().getBits()) {
+      final DrillbitInfo drillbit = new DrillbitInfo(endpoint,
+              currentDrillbit.equals(endpoint),
+              currentVersion.equals(endpoint.getVersion()));
+      if (!drillbit.isVersionMatch()) {
+        mismatchedVersions.add(drillbit.getVersion());
+      }
+      drillbits.add(drillbit);
     }
-    stats.add(new Stat("Data Port Address", work.getContext().getEndpoint().getAddress() +
-      ":" + work.getContext().getEndpoint().getDataPort()));
-    stats.add(new Stat("User Port Address", work.getContext().getEndpoint().getAddress() +
-      ":" + work.getContext().getEndpoint().getUserPort()));
-    stats.add(new Stat("Control Port Address", work.getContext().getEndpoint().getAddress() +
-      ":" + work.getContext().getEndpoint().getControlPort()));
-    stats.add(new Stat("Maximum Direct Memory", DrillConfig.getMaxDirectMemory()));
-
-    return stats;
+
+    return new ClusterInfo(drillbits, currentVersion, mismatchedVersions);
   }
 
   @XmlRootElement
-  public class Stat {
-    private String name;
-    private Object value;
+  public static class ClusterInfo {
+    private final Collection<DrillbitInfo> drillbits;
+    private final String currentVersion;
+    private final Collection<String> mismatchedVersions;
+
+    @JsonCreator
+    public ClusterInfo(Collection<DrillbitInfo> drillbits,
+                       String currentVersion,
+                       Collection<String> mismatchedVersions) {
+      this.drillbits = Sets.newTreeSet(drillbits);
+      this.currentVersion = currentVersion;
+      this.mismatchedVersions = Sets.newTreeSet(mismatchedVersions);
+    }
+
+    public Collection<DrillbitInfo> getDrillbits() {
+      return Sets.newTreeSet(drillbits);
+    }
+
+    public String getCurrentVersion() {
+      return currentVersion;
+    }
+
+    public Collection<String> getMismatchedVersions() {
+      return Sets.newTreeSet(mismatchedVersions);
+    }
+  }
+
+  public static class DrillbitInfo implements Comparable<DrillbitInfo> {
+    private final String address;
+    private final String userPort;
+    private final String controlPort;
+    private final String dataPort;
+    private final String version;
+    private final boolean current;
+    private final boolean versionMatch;
 
     @JsonCreator
-    public Stat(String name, Object value) {
-      this.name = name;
-      this.value = value;
+    public DrillbitInfo(DrillbitEndpoint drillbit, boolean current, boolean versionMatch) {
+      this.address = drillbit.getAddress();
+      this.userPort = String.valueOf(drillbit.getUserPort());
+      this.controlPort = String.valueOf(drillbit.getControlPort());
+      this.dataPort = String.valueOf(drillbit.getDataPort());
+      this.version = Strings.isNullOrEmpty(drillbit.getVersion()) ? "Undefined" : drillbit.getVersion();
+      this.current = current;
+      this.versionMatch = versionMatch;
+    }
+
+    public String getAddress() {
+      return address;
     }
 
-    public String getName() {
-      return name;
+    public String getUserPort() { return userPort; }
+
+    public String getControlPort() { return controlPort; }
+
+    public String getDataPort() { return dataPort; }
+
+    public String getVersion() {
+      return version;
     }
 
-    public Object getValue() {
-      return value;
+    public boolean isCurrent() {
+      return current;
     }
 
+    public boolean isVersionMatch() {
+      return versionMatch;
+    }
+
+    /**
+     * Method used to sort drillbits. Current drillbit goes first.
+     * Then drillbits with matching versions, after them drillbits with mismatching versions.
+     * Matching drillbits are sorted according address natural order,
+     * mismatching drillbits are sorted according version, address natural order.
+     *
+     * @param drillbitToCompare drillbit to compare against
+     * @return -1 if drillbit should be before, 1 if after in list
+     */
+    @Override
+    public int compareTo(DrillbitInfo drillbitToCompare) {
+      if (this.isCurrent()) {
+        return -1;
+      }
+
+      if (drillbitToCompare.isCurrent()) {
+        return 1;
+      }
+
+      if (this.isVersionMatch() == drillbitToCompare.isVersionMatch()) {
+        if (this.version.equals(drillbitToCompare.getVersion())) {
+          return this.address.compareTo(drillbitToCompare.getAddress());
+        }
+        return this.version.compareTo(drillbitToCompare.getVersion());
+      }
+      return this.versionMatch ? -1 : 1;
+    }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/63ffeff8/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index 9b7b6c7..5cad0d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -158,6 +159,7 @@ public class ServiceEngine implements AutoCloseable {
         .setAddress(address)
         //.setAddress("localhost")
         .setUserPort(userPort)
+        .setVersion(DrillVersionInfo.getVersion())
         .build();
 
     partialEndpoint = controller.start(partialEndpoint);

http://git-wip-us.apache.org/repos/asf/drill/blob/63ffeff8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
index 08bc0ac..836d339 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
@@ -39,6 +39,7 @@ public class DrillbitIterator implements Iterator<Object> {
     public int control_port;
     public int data_port;
     public boolean current;
+    public String version;
   }
 
   @Override
@@ -55,6 +56,7 @@ public class DrillbitIterator implements Iterator<Object> {
     i.user_port = ep.getUserPort();
     i.control_port = ep.getControlPort();
     i.data_port = ep.getDataPort();
+    i.version = ep.getVersion();
     return i;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/63ffeff8/exec/java-exec/src/main/resources/rest/index.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/index.ftl b/exec/java-exec/src/main/resources/rest/index.ftl
index 99e9d8c..3175479 100644
--- a/exec/java-exec/src/main/resources/rest/index.ftl
+++ b/exec/java-exec/src/main/resources/rest/index.ftl
@@ -17,17 +17,58 @@
   <a href="/queries">back</a><br/>
   <div class="page-header">
   </div>
-  <div class="table-responsive">
-    <table class="table table-hover">
-      <tbody>
-        <#list model as stat>
-          <tr>
-            <td style="border:none;"><b>${stat.getName()}</b></td>
-            <td style="border:none; font-family: Courier;">${stat.getValue()}</td>
-          </tr>
-        </#list>
-      </tbody>
-    </table>
+
+  <#if (model.getMismatchedVersions()?size > 0)>
+    <div id="message" class="alert alert-danger alert-dismissable">
+      <button type="button" class="close" data-dismiss="alert" aria-hidden="true">&times;</button>
+      <strong>Drill does not support clusters containing a mix of Drillbit versions.
+          Current drillbit version is ${model.getCurrentVersion()}.
+          One or more drillbits in cluster have different version:
+          ${model.getMismatchedVersions()?join(", ")}.
+      </strong>
+    </div>
+  </#if>
+
+  <div class="row">
+    <div class="col-md-12">
+      <h3>Drillbits <span class="label label-primary">${model.getDrillbits()?size}</span></h3>
+      <div class="table-responsive">
+        <table class="table table-hover">
+          <thead>
+            <tr>
+              <th>#</th>
+              <th>Address</th>
+              <th>User Port</th>
+              <th>Control Port</th>
+              <th>Data Port</th>
+              <th>Version</th>
+            </tr>
+          </thead>
+          <tbody>
+            <#assign i = 1>
+            <#list model.getDrillbits() as drillbit>
+              <tr>
+                <td>${i}</td>
+                <td>${drillbit.getAddress()}
+                  <#if drillbit.isCurrent()>
+                    <span class="label label-info">Current</span>
+                  </#if>
+                </td>
+                <td>${drillbit.getUserPort()}</td>
+                <td>${drillbit.getControlPort()}</td>
+                <td>${drillbit.getDataPort()}</td>
+                <td>
+                  <span class="label
+                    <#if drillbit.isVersionMatch()>label-success<#else>label-danger</#if>">
+                    ${drillbit.getVersion()}
+                  </span>
+                </td>
+              </tr>
+              <#assign i = i + 1>
+            </#list>
+          </tbody>
+        </table>
+      </div>
   </div>
 </#macro>
 

http://git-wip-us.apache.org/repos/asf/drill/blob/63ffeff8/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
index cd59dde..f3bd63a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
@@ -242,7 +242,7 @@ public class TestMetadataProvider extends BaseTestQuery {
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<ColumnMetadata> columns = resp.getColumnsList();
-    assertEquals(70, columns.size());
+    assertEquals(71, columns.size());
     // too many records to verify the output.
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/63ffeff8/protocol/src/main/java/org/apache/drill/exec/proto/CoordinationProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/CoordinationProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/CoordinationProtos.java
index 177e560..4fa28df 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/CoordinationProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/CoordinationProtos.java
@@ -86,6 +86,21 @@ public final class CoordinationProtos {
      * <code>optional .exec.Roles roles = 5;</code>
      */
     org.apache.drill.exec.proto.CoordinationProtos.RolesOrBuilder getRolesOrBuilder();
+
+    // optional string version = 6;
+    /**
+     * <code>optional string version = 6;</code>
+     */
+    boolean hasVersion();
+    /**
+     * <code>optional string version = 6;</code>
+     */
+    java.lang.String getVersion();
+    /**
+     * <code>optional string version = 6;</code>
+     */
+    com.google.protobuf.ByteString
+        getVersionBytes();
   }
   /**
    * Protobuf type {@code exec.DrillbitEndpoint}
@@ -171,6 +186,11 @@ public final class CoordinationProtos {
               bitField0_ |= 0x00000010;
               break;
             }
+            case 50: {
+              bitField0_ |= 0x00000020;
+              version_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -324,12 +344,56 @@ public final class CoordinationProtos {
       return roles_;
     }
 
+    // optional string version = 6;
+    public static final int VERSION_FIELD_NUMBER = 6;
+    private java.lang.Object version_;
+    /**
+     * <code>optional string version = 6;</code>
+     */
+    public boolean hasVersion() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional string version = 6;</code>
+     */
+    public java.lang.String getVersion() {
+      java.lang.Object ref = version_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          version_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string version = 6;</code>
+     */
+    public com.google.protobuf.ByteString
+        getVersionBytes() {
+      java.lang.Object ref = version_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        version_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       address_ = "";
       userPort_ = 0;
       controlPort_ = 0;
       dataPort_ = 0;
       roles_ = org.apache.drill.exec.proto.CoordinationProtos.Roles.getDefaultInstance();
+      version_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -358,6 +422,9 @@ public final class CoordinationProtos {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeMessage(5, roles_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBytes(6, getVersionBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -387,6 +454,10 @@ public final class CoordinationProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(5, roles_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(6, getVersionBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -518,6 +589,8 @@ public final class CoordinationProtos {
           rolesBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000010);
+        version_ = "";
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
 
@@ -570,6 +643,10 @@ public final class CoordinationProtos {
         } else {
           result.roles_ = rolesBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.version_ = version_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -603,6 +680,11 @@ public final class CoordinationProtos {
         if (other.hasRoles()) {
           mergeRoles(other.getRoles());
         }
+        if (other.hasVersion()) {
+          bitField0_ |= 0x00000020;
+          version_ = other.version_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -920,6 +1002,80 @@ public final class CoordinationProtos {
         return rolesBuilder_;
       }
 
+      // optional string version = 6;
+      private java.lang.Object version_ = "";
+      /**
+       * <code>optional string version = 6;</code>
+       */
+      public boolean hasVersion() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional string version = 6;</code>
+       */
+      public java.lang.String getVersion() {
+        java.lang.Object ref = version_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          version_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string version = 6;</code>
+       */
+      public com.google.protobuf.ByteString
+          getVersionBytes() {
+        java.lang.Object ref = version_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          version_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string version = 6;</code>
+       */
+      public Builder setVersion(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000020;
+        version_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string version = 6;</code>
+       */
+      public Builder clearVersion() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        version_ = getDefaultInstance().getVersion();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string version = 6;</code>
+       */
+      public Builder setVersionBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000020;
+        version_ = value;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.DrillbitEndpoint)
     }
 
@@ -2419,18 +2575,19 @@ public final class CoordinationProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\022Coordination.proto\022\004exec\"{\n\020DrillbitEn" +
-      "dpoint\022\017\n\007address\030\001 \001(\t\022\021\n\tuser_port\030\002 \001" +
-      "(\005\022\024\n\014control_port\030\003 \001(\005\022\021\n\tdata_port\030\004 " +
-      "\001(\005\022\032\n\005roles\030\005 \001(\0132\013.exec.Roles\"i\n\024Drill" +
-      "ServiceInstance\022\n\n\002id\030\001 \001(\t\022\033\n\023registrat" +
-      "ionTimeUTC\030\002 \001(\003\022(\n\010endpoint\030\003 \001(\0132\026.exe" +
-      "c.DrillbitEndpoint\"\227\001\n\005Roles\022\027\n\tsql_quer" +
-      "y\030\001 \001(\010:\004true\022\032\n\014logical_plan\030\002 \001(\010:\004tru" +
-      "e\022\033\n\rphysical_plan\030\003 \001(\010:\004true\022\033\n\rjava_e" +
-      "xecutor\030\004 \001(\010:\004true\022\037\n\021distributed_cache",
-      "\030\005 \001(\010:\004trueB3\n\033org.apache.drill.exec.pr" +
-      "otoB\022CoordinationProtosH\001"
+      "\n\022Coordination.proto\022\004exec\"\214\001\n\020DrillbitE" +
+      "ndpoint\022\017\n\007address\030\001 \001(\t\022\021\n\tuser_port\030\002 " +
+      "\001(\005\022\024\n\014control_port\030\003 \001(\005\022\021\n\tdata_port\030\004" +
+      " \001(\005\022\032\n\005roles\030\005 \001(\0132\013.exec.Roles\022\017\n\007vers" +
+      "ion\030\006 \001(\t\"i\n\024DrillServiceInstance\022\n\n\002id\030" +
+      "\001 \001(\t\022\033\n\023registrationTimeUTC\030\002 \001(\003\022(\n\010en" +
+      "dpoint\030\003 \001(\0132\026.exec.DrillbitEndpoint\"\227\001\n" +
+      "\005Roles\022\027\n\tsql_query\030\001 \001(\010:\004true\022\032\n\014logic" +
+      "al_plan\030\002 \001(\010:\004true\022\033\n\rphysical_plan\030\003 \001" +
+      "(\010:\004true\022\033\n\rjava_executor\030\004 \001(\010:\004true\022\037\n",
+      "\021distributed_cache\030\005 \001(\010:\004trueB3\n\033org.ap" +
+      "ache.drill.exec.protoB\022CoordinationProto" +
+      "sH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -2442,7 +2599,7 @@ public final class CoordinationProtos {
           internal_static_exec_DrillbitEndpoint_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_DrillbitEndpoint_descriptor,
-              new java.lang.String[] { "Address", "UserPort", "ControlPort", "DataPort", "Roles", });
+              new java.lang.String[] { "Address", "UserPort", "ControlPort", "DataPort", "Roles", "Version", });
           internal_static_exec_DrillServiceInstance_descriptor =
             getDescriptor().getMessageTypes().get(1);
           internal_static_exec_DrillServiceInstance_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/drill/blob/63ffeff8/protocol/src/main/java/org/apache/drill/exec/proto/SchemaCoordinationProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaCoordinationProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaCoordinationProtos.java
index 722e6f2..a7d83e4 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaCoordinationProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaCoordinationProtos.java
@@ -46,6 +46,8 @@ public final class SchemaCoordinationProtos
                 if(message.hasRoles())
                     output.writeObject(5, message.getRoles(), org.apache.drill.exec.proto.SchemaCoordinationProtos.Roles.WRITE, false);
 
+                if(message.hasVersion())
+                    output.writeString(6, message.getVersion(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint message)
             {
@@ -101,6 +103,9 @@ public final class SchemaCoordinationProtos
                             builder.setRoles(input.mergeObject(org.apache.drill.exec.proto.CoordinationProtos.Roles.newBuilder(), org.apache.drill.exec.proto.SchemaCoordinationProtos.Roles.MERGE));
 
                             break;
+                        case 6:
+                            builder.setVersion(input.readString());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -146,6 +151,7 @@ public final class SchemaCoordinationProtos
                 case 3: return "controlPort";
                 case 4: return "dataPort";
                 case 5: return "roles";
+                case 6: return "version";
                 default: return null;
             }
         }
@@ -162,6 +168,7 @@ public final class SchemaCoordinationProtos
             fieldMap.put("controlPort", 3);
             fieldMap.put("dataPort", 4);
             fieldMap.put("roles", 5);
+            fieldMap.put("version", 6);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/63ffeff8/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillbitEndpoint.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillbitEndpoint.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillbitEndpoint.java
index 71daf56..2257763 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillbitEndpoint.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillbitEndpoint.java
@@ -52,6 +52,7 @@ public final class DrillbitEndpoint implements Externalizable, Message<DrillbitE
     private int controlPort;
     private int dataPort;
     private Roles roles;
+    private String version;
 
     public DrillbitEndpoint()
     {
@@ -125,6 +126,19 @@ public final class DrillbitEndpoint implements Externalizable, Message<DrillbitE
         return this;
     }
 
+    // version
+
+    public String getVersion()
+    {
+        return version;
+    }
+
+    public DrillbitEndpoint setVersion(String version)
+    {
+        this.version = version;
+        return this;
+    }
+
     // java serialization
 
     public void readExternal(ObjectInput in) throws IOException
@@ -195,6 +209,9 @@ public final class DrillbitEndpoint implements Externalizable, Message<DrillbitE
                     message.roles = input.mergeObject(message.roles, Roles.getSchema());
                     break;
 
+                case 6:
+                    message.version = input.readString();
+                    break;
                 default:
                     input.handleUnknownField(number, this);
             }   
@@ -219,6 +236,9 @@ public final class DrillbitEndpoint implements Externalizable, Message<DrillbitE
         if(message.roles != null)
              output.writeObject(5, message.roles, Roles.getSchema(), false);
 
+
+        if(message.version != null)
+            output.writeString(6, message.version, false);
     }
 
     public String getFieldName(int number)
@@ -230,6 +250,7 @@ public final class DrillbitEndpoint implements Externalizable, Message<DrillbitE
             case 3: return "controlPort";
             case 4: return "dataPort";
             case 5: return "roles";
+            case 6: return "version";
             default: return null;
         }
     }
@@ -248,6 +269,7 @@ public final class DrillbitEndpoint implements Externalizable, Message<DrillbitE
         __fieldMap.put("controlPort", 3);
         __fieldMap.put("dataPort", 4);
         __fieldMap.put("roles", 5);
+        __fieldMap.put("version", 6);
     }
     
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/63ffeff8/protocol/src/main/protobuf/Coordination.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/Coordination.proto b/protocol/src/main/protobuf/Coordination.proto
index 28c99d6..3f15cf9 100644
--- a/protocol/src/main/protobuf/Coordination.proto
+++ b/protocol/src/main/protobuf/Coordination.proto
@@ -10,6 +10,7 @@ message DrillbitEndpoint{
   optional int32 control_port = 3;
   optional int32 data_port = 4;
   optional Roles roles = 5;
+  optional string version = 6;
 }
 
 message DrillServiceInstance{


[2/5] drill git commit: DRILL-5094: Comparator should guarantee transitive attribute.

Posted by am...@apache.org.
DRILL-5094: Comparator should guarantee transitive attribute.

close apache/drill#675


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/db9d1b1e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/db9d1b1e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/db9d1b1e

Branch: refs/heads/master
Commit: db9d1b1e85e06197ef197c773394dfa724c1259c
Parents: 63ffeff
Author: chunhui-shi <cs...@maprtech.com>
Authored: Thu Dec 1 17:55:12 2016 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Fri Dec 2 08:38:56 2016 -0800

----------------------------------------------------------------------
 .../org/apache/drill/exec/store/schedule/AssignmentCreator.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/db9d1b1e/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index eed200e..127264a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -50,7 +50,8 @@ public class AssignmentCreator<T extends CompleteWork> {
   private static Comparator<Entry<DrillbitEndpoint,Long>> comparator = new Comparator<Entry<DrillbitEndpoint,Long>>() {
     @Override
     public int compare(Entry<DrillbitEndpoint, Long> o1, Entry<DrillbitEndpoint,Long> o2) {
-      return (int) (o1.getValue() - o2.getValue());
+      long ret = o1.getValue() - o2.getValue();
+      return ret > 0? 1 : ((ret < 0)? -1: 0);
     }
   };
 


[4/5] drill git commit: DRILL-5050: C++ client library has symbol resolution issues when loaded by a process that already uses boost::asio

Posted by am...@apache.org.
DRILL-5050: C++ client library has symbol resolution issues when loaded by a process that already uses boost::asio

Build with Boost static libs and drill_boost namespace on mac. Added
readme with instructions

DRILL-5050: Addressed review comments

DRILL-5050: address more review comments

close apache/drill#659


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/42006ad3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/42006ad3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/42006ad3

Branch: refs/heads/master
Commit: 42006ad3324c778b3f3867079c9e75121c743c73
Parents: 9062901
Author: Parth Chandra <pa...@apache.org>
Authored: Mon Nov 14 14:01:29 2016 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sun Dec 4 07:25:35 2016 -0800

----------------------------------------------------------------------
 contrib/native/client/CMakeLists.txt |  14 ++--
 contrib/native/client/readme.boost   |  56 ++++++++++++++++
 contrib/native/client/readme.macos   | 108 ++++++++++++++++++++++++++++++
 3 files changed, 173 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/42006ad3/contrib/native/client/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/CMakeLists.txt b/contrib/native/client/CMakeLists.txt
index 7e22ce8..65e3b85 100644
--- a/contrib/native/client/CMakeLists.txt
+++ b/contrib/native/client/CMakeLists.txt
@@ -84,23 +84,27 @@ add_definitions("-DGIT_COMMIT_PROP=${GIT_COMMIT_PROP}")
 
 
 # Find Boost
+set(Boost_USE_STATIC_LIBS ON)
+set(Boost_USE_MULTITHREADED ON)
 if(MSVC)
-    set(Boost_USE_STATIC_LIBS ON)
-    set(Boost_USE_MULTITHREADED ON)
     set(Boost_USE_STATIC_RUNTIME OFF)
 else()
-    set(Boost_USE_STATIC_LIBS OFF)
-    set(Boost_USE_MULTITHREADED ON)
-    set(Boost_USE_STATIC_RUNTIME OFF)
+    #    To build a production version, the linux/macos build must use a shaded version
+    #    of boost. Arbirtarily, we choose the new namspace to be drill_boost.
+    #    See the instructions in the readme for linux/macos and rebuild boost. Then
+    #    uncomment the line below to build
+    #    set(Boost_NAMESPACE drill_boost)
 endif()
 
 find_package(Boost 1.53.0 REQUIRED COMPONENTS regex system date_time chrono thread random)
 include_directories(${Boost_INCLUDE_DIRS})
 
+
 if(CMAKE_COMPILER_IS_GNUCXX)
     set(CMAKE_EXE_LINKER_FLAGS "-lrt -lpthread")
     set(CMAKE_CXX_FLAGS "-fPIC")
 endif()
+
 if(MSVC)
     set(CMAKE_CXX_FLAGS "/EHsc")
 endif()

http://git-wip-us.apache.org/repos/asf/drill/blob/42006ad3/contrib/native/client/readme.boost
----------------------------------------------------------------------
diff --git a/contrib/native/client/readme.boost b/contrib/native/client/readme.boost
new file mode 100644
index 0000000..a6035e4
--- /dev/null
+++ b/contrib/native/client/readme.boost
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+Building Boost for Drill on MacOs/Linux
+--------------------------------
+
+These instructions are using Boost version 1.60.0
+
+Assuming there is a BOOST_BUILD_DIR 
+
+$ cd $BOOST_BUILD_DIR
+$ tar zxf boost_1_60_0.tar.gz
+$ cd $BOOST_BUILD_DIR/boost_1_60_0
+$ ./bootstrap.sh --prefix=$BOOST_BUILD_DIR/boost_1_60_0/
+$ ./b2 tools/bcp
+$ cd $BOOST_BUILD_DIR/drill_boost_1_60_0
+
+# Use boost bcp to rename the boost namespace to drill_boost
+# the following builds a subset of boost without icu. You may need to add more modules to include icu. 
+# bcp documentation can be found here: http://www.boost.org/doc/libs/1_60_0/tools/bcp/doc/html/index.html
+
+$ $BOOST_BUILD_DIR/boost_1_60_0/dist/bin/bcp --namespace=drill_boost --namespace-alias --boost=$BOOST_BUILD_DIR/boost_1_60_0/ shared_ptr random context chrono date_time regex system timer thread asio smart_ptr bind config build regex config assign functional multiprecision $BOOST_BUILD_DIR/drill_boost_1_60_0 
+
+$ cd $BOOST_BUILD_DIR/drill_boost_1_60_0
+$ ./bootstrap.sh --prefix=$BOOST_BUILD_DIR/drill_boost_1_60_0/
+
+# change the variant to debug for a debug build
+  # For linux 
+  $ ./b2 --build-dir=$BOOST_BUILD_DIR/drill_boost_1_60_0/build variant=release link=static threading=multi cxxflags="-fPIC"
+  # For MacOS
+  $ ./b2 --build-dir=$BOOST_BUILD_DIR/drill_boost_1_60_0/build variant=release link=static threading=multi 
+
+
+# To build the Drill client library , export the following to make sure boost is picked up correctly
+$ export BOOST_INCLUDEDIR=$BOOST_BUILD_DIR/drill_boost_1_60_0
+$ export BOOST_LIBRARYDIR=$BOOST_BUILD_DIR/drill_boost_1_60_0/stage/lib
+$ export Boost_NO_SYSTEM_PATHS=ON
+
+# Then follow the usual CMake build steps.
+
+

http://git-wip-us.apache.org/repos/asf/drill/blob/42006ad3/contrib/native/client/readme.macos
----------------------------------------------------------------------
diff --git a/contrib/native/client/readme.macos b/contrib/native/client/readme.macos
new file mode 100644
index 0000000..4785e87
--- /dev/null
+++ b/contrib/native/client/readme.macos
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+MacOS build (tested on OS X El Capitan)
+
+Install Prerequisites
+---------------------
+
+0.1) Install XCode 
+  Download and install from here: https://developer.apple.com/xcode/downloads/
+  or from the App store https://itunes.apple.com/us/app/xcode/id497799835?mt=12
+  In Terminal, install the command line tools 
+    $> xcode-select --install
+
+0.2) Install brew following the instructions here: http://brew.sh/
+
+1) CMAKE 3.0 or above
+  Download and install Cmake : https://cmake.org/download/
+  or use brew to install 
+  $> brew install cmake
+
+
+2.1) Install protobuf 2.5.0 (or higher)
+  $> brew install protobuf
+
+2.2) Install zookeeper
+  $> brew install zookeeper
+
+2.3) Install boost
+  $> brew install boost
+
+2.3.1) For production builds, see the readme.boost file  
+  
+2.3.1.1 Build using XCODE
+=========================  
+(Optional) Refresh protobuf source files
+----------------------------------------
+When changes have been introduced to the protocol module, you might need to refresh the protobuf C++ source files too.
+  $> cd DRILL_DIR/contrib/native/client
+  $> mkdir build
+  $> cd build && cmake3 -G "XCode" -D CMAKE_BUILD_TYPE=Debug ..
+  $> xcodebuild -project drillclient.xcodeproj -configuration ${BUILDTYPE} -target fixProtobufs
+  $> xcodebuild -project drillclient.xcodeproj -configuration ${BUILDTYPE} -target cpProtobufs
+
+Open a pull request with the changes to DRILL_DIR/contrib/native/client/src/protobuf
+
+Build drill client
+-------------------
+  $> cd DRILL_DIR/contrib/native/client
+  $> mkdir build
+  $> cd build && cmake3 -G "XCode" -D CMAKE_BUILD_TYPE=Debug ..
+  $> xcodebuild -project drillclient.xcodeproj -configuration ${BUILDTYPE} -target ALL_BUILD
+
+
+XCode IDE
+---------
+  You can open the drillclient.xcodeproj file in the XCode ide and run/debug as with any other command line app
+
+2.3.1.2 Build using MAKE
+========================
+(Optional) Refresh protobuf source files
+----------------------------------------
+When changes have been introduced to the protocol module, you might need to refresh the protobuf C++ source files too.
+    $> cd DRILL_DIR/contrib/native/client
+    $> mkdir build
+    $> cd build && cmake3 -G "Unix Makefiles" ..
+    $> make cpProtobufs
+
+Open a pull request with the changes to DRILL_DIR/contrib/native/client/src/protobuf
+
+Build drill client
+-------------------
+    $> cd DRILL_DIR/contrib/native/client
+    $> mkdir build
+    $> cd build && cmake3 -G "Unix Makefiles" -D CMAKE_BUILD_TYPE=Debug ..
+    $> make
+
+
+2.4 Test
+--------
+Run query submitter from the command line
+  $> querySubmitter query='select * from dfs.`/Users/pchandra/work/data/tpc-h/customer.parquet`' type=sql connectStr=local=10.250.0.146:31010 api=async logLevel=trace user=yourUserName password=yourPassWord
+
+2.5 Valgrind
+------------
+  Install valgrind using brew
+  $> brew install valgrind
+  $> valgrind --leak-check=yes querySubmitter query='select LINEITEM from dfs.`/Users/pchandra/work/data/tpc-h/customer.parquet`' type=sql connectStr=local=10.250.0.146:31010 api=async logLevel=trace
+
+
+
+


[3/5] drill git commit: DRILL-5086: Fix conversion of min and max values to appropriate data type.

Posted by am...@apache.org.
DRILL-5086: Fix conversion of min and max values to appropriate data type.

close apache/drill#674


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9062901f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9062901f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9062901f

Branch: refs/heads/master
Commit: 9062901fd1656ffacbd55a2b5cc7de6a03982590
Parents: db9d1b1
Author: Aman Sinha <as...@maprtech.com>
Authored: Wed Nov 30 15:19:36 2016 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sun Dec 4 07:25:13 2016 -0800

----------------------------------------------------------------------
 .../store/parquet/stat/ParquetMetaStatCollector.java    | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9062901f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
index 3fe10c8..d86f863 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
@@ -137,23 +137,23 @@ public class ParquetMetaStatCollector implements  ColumnStatCollector{
       switch (type.getMinorType()) {
       case INT :
       case TIME:
-        ((IntStatistics) stat).setMinMax(((Integer) min).intValue(), ((Integer) max).intValue());
+        ((IntStatistics) stat).setMinMax(Integer.parseInt(min.toString()), Integer.parseInt(max.toString()));
         break;
       case BIGINT:
       case TIMESTAMP:
-        ((LongStatistics) stat).setMinMax(((Long) min).longValue(), ((Long) max).longValue());
+        ((LongStatistics) stat).setMinMax(Long.parseLong(min.toString()), Long.parseLong(max.toString()));
         break;
       case FLOAT4:
-        ((FloatStatistics) stat).setMinMax(((Float) min).floatValue(), ((Float) max).floatValue());
+        ((FloatStatistics) stat).setMinMax(Float.parseFloat(min.toString()), Float.parseFloat(max.toString()));
         break;
       case FLOAT8:
-        ((DoubleStatistics) stat).setMinMax(((Double) min).doubleValue(), ((Double) max).doubleValue());
+        ((DoubleStatistics) stat).setMinMax(Double.parseDouble(min.toString()), Double.parseDouble(max.toString()));
         break;
       case DATE:
         convertedStat = new LongStatistics();
         convertedStat.setNumNulls(stat.getNumNulls());
-        final long minMS = convertToDrillDateValue(((Integer) min).intValue());
-        final long maxMS = convertToDrillDateValue(((Integer) max).intValue());
+        final long minMS = convertToDrillDateValue(Integer.parseInt(min.toString()));
+        final long maxMS = convertToDrillDateValue(Integer.parseInt(max.toString()));
         ((LongStatistics) convertedStat ).setMinMax(minMS, maxMS);
         break;
       default:


[5/5] drill git commit: DRILL-4982: Separate Hive reader classes for different data formats to improve performance.

Posted by am...@apache.org.
DRILL-4982: Separate Hive reader classes for different data formats to improve performance.

1, Separating Hive reader classes allows optimization to apply on different classes in optimized ways. This  separation effectively avoid the performance degradation of scan.

2, Do not apply Skip footer/header mechanism on most Hive formats. This skip mechanism introduces extra checks on each incoming records.

close apache/drill#638


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/68bd27a1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/68bd27a1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/68bd27a1

Branch: refs/heads/master
Commit: 68bd27a128c2f244bd504369dce510727ea28da7
Parents: 42006ad
Author: chunhui-shi <cs...@maprtech.com>
Authored: Sun Oct 30 01:29:06 2016 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sun Dec 4 08:35:05 2016 -0800

----------------------------------------------------------------------
 .../core/src/main/codegen/config.fmpp           |   1 +
 .../core/src/main/codegen/data/HiveFormats.tdd  |  50 ++
 .../codegen/templates/HiveRecordReaders.java    | 300 +++++++++++
 .../exec/store/hive/HiveAbstractReader.java     | 361 +++++++++++++
 .../hive/HiveDrillNativeScanBatchCreator.java   |   2 +-
 .../drill/exec/store/hive/HiveRecordReader.java | 515 -------------------
 .../exec/store/hive/HiveScanBatchCreator.java   |  58 ++-
 7 files changed, 752 insertions(+), 535 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/codegen/config.fmpp
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/codegen/config.fmpp b/contrib/storage-hive/core/src/main/codegen/config.fmpp
index cd36891..d8ca3fa 100644
--- a/contrib/storage-hive/core/src/main/codegen/config.fmpp
+++ b/contrib/storage-hive/core/src/main/codegen/config.fmpp
@@ -16,6 +16,7 @@
 
 data: {
     drillOI:tdd(../data/HiveTypes.tdd)
+    hiveFormat:tdd(../data/HiveFormats.tdd)
 }
 freemarkerLinks: {
     includes: includes/

http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd b/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd
new file mode 100644
index 0000000..5200e4a
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http:# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  map: [
+    {
+      hiveFormat: "HiveAvro",
+      hiveReader: "Avro",
+      hasHeaderFooter: false,
+    },
+    {
+      hiveFormat: "HiveParquet",
+      hiveReader: "Parquet",
+      hasHeaderFooter: false,
+    },
+    {
+      hiveFormat: "HiveText",
+      hiveReader: "Text",
+      hasHeaderFooter: true,
+    },
+    {
+      hiveFormat: "HiveOrc",
+      hiveReader: "Orc",
+      hasHeaderFooter: false,
+    },
+    {
+       hiveFormat: "HiveRCFile",
+       hiveReader: "RCFile",
+       hasHeaderFooter: false,
+    },
+    {
+      hiveFormat: "HiveDefault",
+      hiveReader: "Default",
+      hasHeaderFooter: false,
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
new file mode 100644
index 0000000..0dc8c08
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This template is used to generate different Hive record reader classes for different data formats
+ * to avoid JIT profile pullusion. These readers are derived from HiveAbstractReader which implements
+ * codes for init and setup stage, but the repeated - and performance critical part - next() method is
+ * separately implemented in the classes generated from this template. The internal SkipRecordReeader
+ * class is also separated as well due to the same reason.
+ *
+ * As to the performance gain with this change, please refer to:
+ * https://issues.apache.org/jira/browse/DRILL-4982
+ *
+ */
+<@pp.dropOutputFile />
+<#list hiveFormat.map as entry>
+<@pp.changeOutputFile name="/org/apache/drill/exec/store/hive/Hive${entry.hiveReader}Reader.java" />
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.store.hive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+
+import org.apache.hadoop.mapred.RecordReader;
+<#if entry.hasHeaderFooter == true>
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde.serdeConstants;
+</#if>
+
+public class Hive${entry.hiveReader}Reader extends HiveAbstractReader {
+
+  Object key;
+<#if entry.hasHeaderFooter == true>
+  SkipRecordsInspector skipRecordsInspector;
+<#else>
+  Object value;
+</#if>
+
+  public Hive${entry.hiveReader}Reader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
+                       FragmentContext context, final HiveConf hiveConf,
+                       UserGroupInformation proxyUgi) throws ExecutionSetupException {
+    super(table, partition, inputSplit, projectedColumns, context, hiveConf, proxyUgi);
+  }
+
+  public  void internalInit(Properties tableProperties, RecordReader<Object, Object> reader) {
+
+    key = reader.createKey();
+<#if entry.hasHeaderFooter == true>
+    skipRecordsInspector = new SkipRecordsInspector(tableProperties, reader);
+<#else>
+    value = reader.createValue();
+</#if>
+
+  }
+  private void readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
+    for (int i = 0; i < selectedStructFieldRefs.size(); i++) {
+      Object hiveValue = finalOI.getStructFieldData(deSerializedValue, selectedStructFieldRefs.get(i));
+      if (hiveValue != null) {
+        selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
+          vectors.get(i), outputRecordIndex);
+      }
+    }
+  }
+
+<#if entry.hasHeaderFooter == true>
+  @Override
+  public int next() {
+    for (ValueVector vv : vectors) {
+      AllocationHelper.allocateNew(vv, TARGET_RECORD_COUNT);
+    }
+    if (empty) {
+      setValueCountAndPopulatePartitionVectors(0);
+      return 0;
+    }
+
+    try {
+      skipRecordsInspector.reset();
+      Object value;
+
+      int recordCount = 0;
+
+      while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value = skipRecordsInspector.getNextValue())) {
+        if (skipRecordsInspector.doSkipHeader(recordCount++)) {
+          continue;
+        }
+        Object bufferedValue = skipRecordsInspector.bufferAdd(value);
+        if (bufferedValue != null) {
+          Object deSerializedValue = partitionSerDe.deserialize((Writable) bufferedValue);
+          if (partTblObjectInspectorConverter != null) {
+            deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
+          }
+          readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, skipRecordsInspector.getActualCount());
+          skipRecordsInspector.incrementActualCount();
+        }
+        skipRecordsInspector.incrementTempCount();
+      }
+
+      setValueCountAndPopulatePartitionVectors(skipRecordsInspector.getActualCount());
+      skipRecordsInspector.updateContinuance();
+      return skipRecordsInspector.getActualCount();
+    } catch (IOException | SerDeException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+/**
+ * SkipRecordsInspector encapsulates logic to skip header and footer from file.
+ * Logic is applicable only for predefined in constructor file formats.
+ */
+protected class SkipRecordsInspector {
+
+  private final Set<Object> fileFormats;
+  private int headerCount;
+  private int footerCount;
+  private Queue<Object> footerBuffer;
+  // indicates if we continue reading the same file
+  private boolean continuance;
+  private int holderIndex;
+  private List<Object> valueHolder;
+  private int actualCount;
+  // actualCount without headerCount, used to determine holderIndex
+  private int tempCount;
+
+  protected SkipRecordsInspector(Properties tableProperties, RecordReader reader) {
+    this.fileFormats = new HashSet<Object>(Arrays.asList(org.apache.hadoop.mapred.TextInputFormat.class.getName()));
+    this.headerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, 0);
+    this.footerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, 0);
+    logger.debug("skipRecordInspector: fileFormat {}, headerCount {}, footerCount {}",
+        this.fileFormats, this.headerCount, this.footerCount);
+    this.footerBuffer = Lists.newLinkedList();
+    this.continuance = false;
+    this.holderIndex = -1;
+    this.valueHolder = initializeValueHolder(reader, footerCount);
+    this.actualCount = 0;
+    this.tempCount = 0;
+  }
+
+  protected boolean doSkipHeader(int recordCount) {
+    return !continuance && recordCount < headerCount;
+  }
+
+  protected void reset() {
+    tempCount = holderIndex + 1;
+    actualCount = 0;
+    if (!continuance) {
+      footerBuffer.clear();
+    }
+  }
+
+  protected Object bufferAdd(Object value) throws SerDeException {
+    footerBuffer.add(value);
+    if (footerBuffer.size() <= footerCount) {
+      return null;
+    }
+    return footerBuffer.poll();
+  }
+
+  protected Object getNextValue() {
+    holderIndex = tempCount % getHolderSize();
+    return valueHolder.get(holderIndex);
+  }
+
+  private int getHolderSize() {
+    return valueHolder.size();
+  }
+
+  protected void updateContinuance() {
+    this.continuance = actualCount != 0;
+  }
+
+  protected int incrementTempCount() {
+    return ++tempCount;
+  }
+
+  protected int getActualCount() {
+    return actualCount;
+  }
+
+  protected int incrementActualCount() {
+    return ++actualCount;
+  }
+
+  /**
+   * Retrieves positive numeric property from Properties object by name.
+   * Return default value if
+   * 1. file format is absent in predefined file formats list
+   * 2. property doesn't exist in table properties
+   * 3. property value is negative
+   * otherwise casts value to int.
+   *
+   * @param tableProperties property holder
+   * @param propertyName    name of the property
+   * @param defaultValue    default value
+   * @return property numeric value
+   * @throws NumberFormatException if property value is non-numeric
+   */
+  protected int retrievePositiveIntProperty(Properties tableProperties, String propertyName, int defaultValue) {
+    int propertyIntValue = defaultValue;
+    if (!fileFormats.contains(tableProperties.get(hive_metastoreConstants.FILE_INPUT_FORMAT))) {
+      return propertyIntValue;
+    }
+    Object propertyObject = tableProperties.get(propertyName);
+    if (propertyObject != null) {
+      try {
+        propertyIntValue = Integer.valueOf((String) propertyObject);
+      } catch (NumberFormatException e) {
+        throw new NumberFormatException(String.format("Hive table property %s value '%s' is non-numeric", propertyName, propertyObject.toString()));
+      }
+    }
+    return propertyIntValue < 0 ? defaultValue : propertyIntValue;
+  }
+
+  /**
+   * Creates buffer of objects to be used as values, so these values can be re-used.
+   * Objects number depends on number of lines to skip in the end of the file plus one object.
+   *
+   * @param reader          RecordReader to return value object
+   * @param skipFooterLines number of lines to skip at the end of the file
+   * @return list of objects to be used as values
+   */
+  private List<Object> initializeValueHolder(RecordReader reader, int skipFooterLines) {
+    List<Object> valueHolder = new ArrayList<>(skipFooterLines + 1);
+    for (int i = 0; i <= skipFooterLines; i++) {
+      valueHolder.add(reader.createValue());
+    }
+    return valueHolder;
+  }
+ }
+
+<#else>
+  @Override
+  public int next() {
+    for (ValueVector vv : vectors) {
+      AllocationHelper.allocateNew(vv, TARGET_RECORD_COUNT);
+    }
+    if (empty) {
+      setValueCountAndPopulatePartitionVectors(0);
+      return 0;
+    }
+
+    try {
+      int recordCount = 0;
+      while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) {
+        Object deSerializedValue = partitionSerDe.deserialize((Writable) value);
+        if (partTblObjectInspectorConverter != null) {
+          deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
+        }
+        readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordCount);
+        recordCount++;
+      }
+
+      setValueCountAndPopulatePartitionVectors(recordCount);
+      return recordCount;
+    } catch (IOException | SerDeException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+</#if>
+
+}
+</#list>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
new file mode 100644
index 0000000..107fc66
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.security.UserGroupInformation;
+
+
+public abstract class HiveAbstractReader extends AbstractRecordReader {
+  protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveAbstractReader.class);
+
+  protected final DrillBuf managedBuffer;
+
+  protected Table table;
+  protected Partition partition;
+  protected InputSplit inputSplit;
+  protected List<String> selectedColumnNames;
+  protected List<StructField> selectedStructFieldRefs = Lists.newArrayList();
+  protected List<TypeInfo> selectedColumnTypes = Lists.newArrayList();
+  protected List<ObjectInspector> selectedColumnObjInspectors = Lists.newArrayList();
+  protected List<HiveFieldConverter> selectedColumnFieldConverters = Lists.newArrayList();
+  protected List<String> selectedPartitionNames = Lists.newArrayList();
+  protected List<TypeInfo> selectedPartitionTypes = Lists.newArrayList();
+  protected List<Object> selectedPartitionValues = Lists.newArrayList();
+
+  // SerDe of the reading partition (or table if the table is non-partitioned)
+  protected SerDe partitionSerDe;
+
+  // ObjectInspector to read data from partitionSerDe (for a non-partitioned table this is same as the table
+  // ObjectInspector).
+  protected StructObjectInspector partitionOI;
+
+  // Final ObjectInspector. We may not use the partitionOI directly if there are schema changes between the table and
+  // partition. If there are no schema changes then this is same as the partitionOI.
+  protected StructObjectInspector finalOI;
+
+  // Converter which converts data from partition schema to table schema.
+  protected Converter partTblObjectInspectorConverter;
+
+  protected Object key;
+  protected RecordReader<Object, Object> reader;
+  protected List<ValueVector> vectors = Lists.newArrayList();
+  protected List<ValueVector> pVectors = Lists.newArrayList();
+  protected boolean empty;
+  protected HiveConf hiveConf;
+  protected FragmentContext fragmentContext;
+  protected String defaultPartitionValue;
+  protected final UserGroupInformation proxyUgi;
+
+
+  protected static final int TARGET_RECORD_COUNT = 4000;
+
+  public HiveAbstractReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
+                       FragmentContext context, final HiveConf hiveConf,
+                       UserGroupInformation proxyUgi) throws ExecutionSetupException {
+    this.table = table;
+    this.partition = partition;
+    this.inputSplit = inputSplit;
+    this.empty = (inputSplit == null && partition == null);
+    this.hiveConf = hiveConf;
+    this.fragmentContext = context;
+    this.proxyUgi = proxyUgi;
+    this.managedBuffer = fragmentContext.getManagedBuffer().reallocIfNeeded(256);
+    setColumns(projectedColumns);
+  }
+
+  public abstract void internalInit(Properties tableProperties, RecordReader<Object, Object> reader);
+
+  private void init() throws ExecutionSetupException {
+    final JobConf job = new JobConf(hiveConf);
+
+    // Get the configured default val
+    defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
+
+    Properties tableProperties;
+    try {
+      tableProperties = MetaStoreUtils.getTableMetadata(table);
+      final Properties partitionProperties =
+          (partition == null) ?  tableProperties :
+              HiveUtilities.getPartitionMetadata(partition, table);
+      HiveUtilities.addConfToJob(job, partitionProperties);
+
+      final SerDe tableSerDe = createSerDe(job, table.getSd().getSerdeInfo().getSerializationLib(), tableProperties);
+      final StructObjectInspector tableOI = getStructOI(tableSerDe);
+
+      if (partition != null) {
+        partitionSerDe = createSerDe(job, partition.getSd().getSerdeInfo().getSerializationLib(), partitionProperties);
+        partitionOI = getStructOI(partitionSerDe);
+
+        finalOI = (StructObjectInspector)ObjectInspectorConverters.getConvertedOI(partitionOI, tableOI);
+        partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(partitionOI, finalOI);
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, partition.getSd(), table));
+      } else {
+        // For non-partitioned tables, there is no need to create converter as there are no schema changes expected.
+        partitionSerDe = tableSerDe;
+        partitionOI = tableOI;
+        partTblObjectInspectorConverter = null;
+        finalOI = tableOI;
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, table.getSd(), table));
+      }
+
+      if (logger.isTraceEnabled()) {
+        for (StructField field: finalOI.getAllStructFieldRefs()) {
+          logger.trace("field in finalOI: {}", field.getClass().getName());
+        }
+        logger.trace("partitionSerDe class is {} {}", partitionSerDe.getClass().getName());
+      }
+      // Get list of partition column names
+      final List<String> partitionNames = Lists.newArrayList();
+      for (FieldSchema field : table.getPartitionKeys()) {
+        partitionNames.add(field.getName());
+      }
+
+      // We should always get the columns names from ObjectInspector. For some of the tables (ex. avro) metastore
+      // may not contain the schema, instead it is derived from other sources such as table properties or external file.
+      // SerDe object knows how to get the schema with all the config and table properties passed in initialization.
+      // ObjectInspector created from the SerDe object has the schema.
+      final StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(finalOI);
+      final List<String> tableColumnNames = sTypeInfo.getAllStructFieldNames();
+
+      // Select list of columns for project pushdown into Hive SerDe readers.
+      final List<Integer> columnIds = Lists.newArrayList();
+      if (isStarQuery()) {
+        selectedColumnNames = tableColumnNames;
+        for(int i=0; i<selectedColumnNames.size(); i++) {
+          columnIds.add(i);
+        }
+        selectedPartitionNames = partitionNames;
+      } else {
+        selectedColumnNames = Lists.newArrayList();
+        for (SchemaPath field : getColumns()) {
+          String columnName = field.getRootSegment().getPath();
+          if (partitionNames.contains(columnName)) {
+            selectedPartitionNames.add(columnName);
+          } else {
+            columnIds.add(tableColumnNames.indexOf(columnName));
+            selectedColumnNames.add(columnName);
+          }
+        }
+      }
+      ColumnProjectionUtils.appendReadColumns(job, columnIds, selectedColumnNames);
+
+      for (String columnName : selectedColumnNames) {
+        StructField fieldRef = finalOI.getStructFieldRef(columnName);
+        selectedStructFieldRefs.add(fieldRef);
+        ObjectInspector fieldOI = fieldRef.getFieldObjectInspector();
+
+        TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldOI.getTypeName());
+
+        selectedColumnObjInspectors.add(fieldOI);
+        selectedColumnTypes.add(typeInfo);
+        selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo, fragmentContext));
+      }
+
+      for(int i=0; i<selectedColumnNames.size(); ++i){
+        logger.trace("inspector:typeName={}, className={}, TypeInfo: {}, converter:{}",
+            selectedColumnObjInspectors.get(i).getTypeName(),
+            selectedColumnObjInspectors.get(i).getClass().getName(),
+            selectedColumnTypes.get(i).toString(),
+            selectedColumnFieldConverters.get(i).getClass().getName());
+      }
+
+      for (int i = 0; i < table.getPartitionKeys().size(); i++) {
+        FieldSchema field = table.getPartitionKeys().get(i);
+        if (selectedPartitionNames.contains(field.getName())) {
+          TypeInfo pType = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
+          selectedPartitionTypes.add(pType);
+
+          if (partition != null) {
+            selectedPartitionValues.add(
+                HiveUtilities.convertPartitionType(pType, partition.getValues().get(i), defaultPartitionValue));
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new ExecutionSetupException("Failure while initializing Hive Reader " + this.getClass().getName(), e);
+    }
+
+    if (!empty) {
+      try {
+        reader = (org.apache.hadoop.mapred.RecordReader<Object, Object>) job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL);
+        logger.trace("hive reader created: {} for inputSplit {}", reader.getClass().getName(), inputSplit.toString());
+      } catch (Exception e) {
+        throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
+      }
+
+      internalInit(tableProperties, reader);
+    }
+  }
+
+  /**
+   * Utility method which creates a SerDe object for given SerDe class name and properties.
+   */
+  private static SerDe createSerDe(final JobConf job, final String sLib, final Properties properties) throws Exception {
+    final Class<? extends SerDe> c = Class.forName(sLib).asSubclass(SerDe.class);
+    final SerDe serde = c.getConstructor().newInstance();
+    serde.initialize(job, properties);
+
+    return serde;
+  }
+
+  private static StructObjectInspector getStructOI(final SerDe serDe) throws Exception {
+    ObjectInspector oi = serDe.getObjectInspector();
+    if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
+      throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
+    }
+    return (StructObjectInspector) oi;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output)
+      throws ExecutionSetupException {
+    // initializes "reader"
+    final Callable<Void> readerInitializer = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        init();
+        return null;
+      }
+    };
+
+    final ListenableFuture<Void> result = context.runCallableAs(proxyUgi, readerInitializer);
+    try {
+      result.get();
+    } catch (InterruptedException e) {
+      result.cancel(true);
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      throw ExecutionSetupException.fromThrowable(e.getMessage(), e);
+    }
+    try {
+      final OptionManager options = fragmentContext.getOptions();
+      for (int i = 0; i < selectedColumnNames.size(); i++) {
+        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedColumnTypes.get(i), options);
+        MaterializedField field = MaterializedField.create(selectedColumnNames.get(i), type);
+        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
+        vectors.add(output.addField(field, vvClass));
+      }
+
+      for (int i = 0; i < selectedPartitionNames.size(); i++) {
+        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedPartitionTypes.get(i), options);
+        MaterializedField field = MaterializedField.create(selectedPartitionNames.get(i), type);
+        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
+        pVectors.add(output.addField(field, vvClass));
+      }
+    } catch(SchemaChangeException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  /**
+   * To take into account Hive "skip.header.lines.count" property first N values from file are skipped.
+   * Since file can be read in batches (depends on TARGET_RECORD_COUNT), additional checks are made
+   * to determine if it's new file or continuance.
+   *
+   * To take into account Hive "skip.footer.lines.count" property values are buffered in queue
+   * until queue size exceeds number of footer lines to skip, then first value in queue is retrieved.
+   * Buffer of value objects is used to re-use value objects in order to reduce number of created value objects.
+   * For each new file queue is cleared to drop footer lines from previous file.
+   */
+  @Override
+  public abstract int next();
+
+
+
+  protected void setValueCountAndPopulatePartitionVectors(int recordCount) {
+    for (ValueVector v : vectors) {
+      v.getMutator().setValueCount(recordCount);
+    }
+
+    if (partition != null) {
+      populatePartitionVectors(recordCount);
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    } catch (Exception e) {
+      logger.warn("Failure while closing Hive Record reader.", e);
+    }
+  }
+
+  protected void populatePartitionVectors(int recordCount) {
+    for (int i = 0; i < pVectors.size(); i++) {
+      final ValueVector vector = pVectors.get(i);
+      final Object val = selectedPartitionValues.get(i);
+
+      AllocationHelper.allocateNew(vector, recordCount);
+
+      if (val != null) {
+        HiveUtilities.populateVector(vector, managedBuffer, val, 0, recordCount);
+      }
+
+      vector.getMutator().setValueCount(recordCount);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 4be2ced..81be529 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -169,7 +169,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
     // If there are no readers created (which is possible when the table is empty or no row groups are matched),
     // create an empty RecordReader to output the schema
     if (readers.size() == 0) {
-      readers.add(new HiveRecordReader(table, null, null, columns, context, conf,
+      readers.add(new HiveDefaultReader(table, null, null, columns, context, conf,
         ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName())));
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
deleted file mode 100644
index 8631b8d..0000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ /dev/null
@@ -1,515 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.security.UserGroupInformation;
-
-public class HiveRecordReader extends AbstractRecordReader {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveRecordReader.class);
-
-  private final DrillBuf managedBuffer;
-
-  protected Table table;
-  protected Partition partition;
-  protected InputSplit inputSplit;
-  protected List<String> selectedColumnNames;
-  protected List<TypeInfo> selectedColumnTypes = Lists.newArrayList();
-  protected List<ObjectInspector> selectedColumnObjInspectors = Lists.newArrayList();
-  protected List<HiveFieldConverter> selectedColumnFieldConverters = Lists.newArrayList();
-  protected List<String> selectedPartitionNames = Lists.newArrayList();
-  protected List<TypeInfo> selectedPartitionTypes = Lists.newArrayList();
-  protected List<Object> selectedPartitionValues = Lists.newArrayList();
-
-  // SerDe of the reading partition (or table if the table is non-partitioned)
-  protected SerDe partitionSerDe;
-
-  // ObjectInspector to read data from partitionSerDe (for a non-partitioned table this is same as the table
-  // ObjectInspector).
-  protected StructObjectInspector partitionOI;
-
-  // Final ObjectInspector. We may not use the partitionOI directly if there are schema changes between the table and
-  // partition. If there are no schema changes then this is same as the partitionOI.
-  protected StructObjectInspector finalOI;
-
-  // Converter which converts data from partition schema to table schema.
-  private Converter partTblObjectInspectorConverter;
-
-  protected Object key;
-  protected RecordReader<Object, Object> reader;
-  protected List<ValueVector> vectors = Lists.newArrayList();
-  protected List<ValueVector> pVectors = Lists.newArrayList();
-  protected boolean empty;
-  private HiveConf hiveConf;
-  private FragmentContext fragmentContext;
-  private String defaultPartitionValue;
-  private final UserGroupInformation proxyUgi;
-  private SkipRecordsInspector skipRecordsInspector;
-
-  protected static final int TARGET_RECORD_COUNT = 4000;
-
-  public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
-                          FragmentContext context, final HiveConf hiveConf,
-                          UserGroupInformation proxyUgi) throws ExecutionSetupException {
-    this.table = table;
-    this.partition = partition;
-    this.inputSplit = inputSplit;
-    this.empty = (inputSplit == null && partition == null);
-    this.hiveConf = hiveConf;
-    this.fragmentContext = context;
-    this.proxyUgi = proxyUgi;
-    this.managedBuffer = fragmentContext.getManagedBuffer().reallocIfNeeded(256);
-    setColumns(projectedColumns);
-  }
-
-  private void init() throws ExecutionSetupException {
-    final JobConf job = new JobConf(hiveConf);
-
-    // Get the configured default val
-    defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
-
-    Properties tableProperties;
-    try {
-      tableProperties = MetaStoreUtils.getTableMetadata(table);
-      final Properties partitionProperties =
-          (partition == null) ?  tableProperties :
-              HiveUtilities.getPartitionMetadata(partition, table);
-      HiveUtilities.addConfToJob(job, partitionProperties);
-
-      final SerDe tableSerDe = createSerDe(job, table.getSd().getSerdeInfo().getSerializationLib(), tableProperties);
-      final StructObjectInspector tableOI = getStructOI(tableSerDe);
-
-      if (partition != null) {
-        partitionSerDe = createSerDe(job, partition.getSd().getSerdeInfo().getSerializationLib(), partitionProperties);
-        partitionOI = getStructOI(partitionSerDe);
-
-        finalOI = (StructObjectInspector)ObjectInspectorConverters.getConvertedOI(partitionOI, tableOI);
-        partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(partitionOI, finalOI);
-        job.setInputFormat(HiveUtilities.getInputFormatClass(job, partition.getSd(), table));
-      } else {
-        // For non-partitioned tables, there is no need to create converter as there are no schema changes expected.
-        partitionSerDe = tableSerDe;
-        partitionOI = tableOI;
-        partTblObjectInspectorConverter = null;
-        finalOI = tableOI;
-        job.setInputFormat(HiveUtilities.getInputFormatClass(job, table.getSd(), table));
-      }
-
-      // Get list of partition column names
-      final List<String> partitionNames = Lists.newArrayList();
-      for (FieldSchema field : table.getPartitionKeys()) {
-        partitionNames.add(field.getName());
-      }
-
-      // We should always get the columns names from ObjectInspector. For some of the tables (ex. avro) metastore
-      // may not contain the schema, instead it is derived from other sources such as table properties or external file.
-      // SerDe object knows how to get the schema with all the config and table properties passed in initialization.
-      // ObjectInspector created from the SerDe object has the schema.
-      final StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(finalOI);
-      final List<String> tableColumnNames = sTypeInfo.getAllStructFieldNames();
-
-      // Select list of columns for project pushdown into Hive SerDe readers.
-      final List<Integer> columnIds = Lists.newArrayList();
-      if (isStarQuery()) {
-        selectedColumnNames = tableColumnNames;
-        for(int i=0; i<selectedColumnNames.size(); i++) {
-          columnIds.add(i);
-        }
-        selectedPartitionNames = partitionNames;
-      } else {
-        selectedColumnNames = Lists.newArrayList();
-        for (SchemaPath field : getColumns()) {
-          String columnName = field.getRootSegment().getPath();
-          if (partitionNames.contains(columnName)) {
-            selectedPartitionNames.add(columnName);
-          } else {
-            columnIds.add(tableColumnNames.indexOf(columnName));
-            selectedColumnNames.add(columnName);
-          }
-        }
-      }
-      ColumnProjectionUtils.appendReadColumns(job, columnIds, selectedColumnNames);
-
-      for (String columnName : selectedColumnNames) {
-        ObjectInspector fieldOI = finalOI.getStructFieldRef(columnName).getFieldObjectInspector();
-        TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldOI.getTypeName());
-
-        selectedColumnObjInspectors.add(fieldOI);
-        selectedColumnTypes.add(typeInfo);
-        selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo, fragmentContext));
-      }
-
-      for (int i = 0; i < table.getPartitionKeys().size(); i++) {
-        FieldSchema field = table.getPartitionKeys().get(i);
-        if (selectedPartitionNames.contains(field.getName())) {
-          TypeInfo pType = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
-          selectedPartitionTypes.add(pType);
-
-          if (partition != null) {
-            selectedPartitionValues.add(
-                HiveUtilities.convertPartitionType(pType, partition.getValues().get(i), defaultPartitionValue));
-          }
-        }
-      }
-    } catch (Exception e) {
-      throw new ExecutionSetupException("Failure while initializing HiveRecordReader: " + e.getMessage(), e);
-    }
-
-    if (!empty) {
-      try {
-        reader = (org.apache.hadoop.mapred.RecordReader<Object, Object>) job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL);
-      } catch (Exception e) {
-        throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
-      }
-      key = reader.createKey();
-      skipRecordsInspector = new SkipRecordsInspector(tableProperties, reader);
-    }
-  }
-
-  /**
-   * Utility method which creates a SerDe object for given SerDe class name and properties.
-   */
-  private static SerDe createSerDe(final JobConf job, final String sLib, final Properties properties) throws Exception {
-    final Class<? extends SerDe> c = Class.forName(sLib).asSubclass(SerDe.class);
-    final SerDe serde = c.getConstructor().newInstance();
-    serde.initialize(job, properties);
-
-    return serde;
-  }
-
-  private static StructObjectInspector getStructOI(final SerDe serDe) throws Exception {
-    ObjectInspector oi = serDe.getObjectInspector();
-    if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
-      throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
-    }
-    return (StructObjectInspector) oi;
-  }
-
-  @Override
-  public void setup(OperatorContext context, OutputMutator output)
-      throws ExecutionSetupException {
-    // initializes "reader"
-    final Callable<Void> readerInitializer = new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        init();
-        return null;
-      }
-    };
-
-    final ListenableFuture<Void> result = context.runCallableAs(proxyUgi, readerInitializer);
-    try {
-      result.get();
-    } catch (InterruptedException e) {
-      result.cancel(true);
-      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
-      // interruption and respond to it if it wants to.
-      Thread.currentThread().interrupt();
-    } catch (ExecutionException e) {
-      throw ExecutionSetupException.fromThrowable(e.getMessage(), e);
-    }
-    try {
-      final OptionManager options = fragmentContext.getOptions();
-      for (int i = 0; i < selectedColumnNames.size(); i++) {
-        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedColumnTypes.get(i), options);
-        MaterializedField field = MaterializedField.create(selectedColumnNames.get(i), type);
-        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
-        vectors.add(output.addField(field, vvClass));
-      }
-
-      for (int i = 0; i < selectedPartitionNames.size(); i++) {
-        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedPartitionTypes.get(i), options);
-        MaterializedField field = MaterializedField.create(selectedPartitionNames.get(i), type);
-        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
-        pVectors.add(output.addField(field, vvClass));
-      }
-    } catch(SchemaChangeException e) {
-      throw new ExecutionSetupException(e);
-    }
-  }
-
-  /**
-   * To take into account Hive "skip.header.lines.count" property first N values from file are skipped.
-   * Since file can be read in batches (depends on TARGET_RECORD_COUNT), additional checks are made
-   * to determine if it's new file or continuance.
-   *
-   * To take into account Hive "skip.footer.lines.count" property values are buffered in queue
-   * until queue size exceeds number of footer lines to skip, then first value in queue is retrieved.
-   * Buffer of value objects is used to re-use value objects in order to reduce number of created value objects.
-   * For each new file queue is cleared to drop footer lines from previous file.
-   */
-  @Override
-  public int next() {
-    for (ValueVector vv : vectors) {
-      AllocationHelper.allocateNew(vv, TARGET_RECORD_COUNT);
-    }
-    if (empty) {
-      setValueCountAndPopulatePartitionVectors(0);
-      return 0;
-    }
-
-    try {
-      skipRecordsInspector.reset();
-      int recordCount = 0;
-      Object value;
-      while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value = skipRecordsInspector.getNextValue())) {
-        if (skipRecordsInspector.doSkipHeader(recordCount++)) {
-          continue;
-        }
-        Object bufferedValue = skipRecordsInspector.bufferAdd(value);
-        if (bufferedValue != null) {
-          Object deSerializedValue = partitionSerDe.deserialize((Writable) bufferedValue);
-          if (partTblObjectInspectorConverter != null) {
-            deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
-          }
-          readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, skipRecordsInspector.getActualCount());
-          skipRecordsInspector.incrementActualCount();
-        }
-        skipRecordsInspector.incrementTempCount();
-      }
-
-      setValueCountAndPopulatePartitionVectors(skipRecordsInspector.getActualCount());
-      skipRecordsInspector.updateContinuance();
-      return skipRecordsInspector.getActualCount();
-    } catch (IOException | SerDeException e) {
-      throw new DrillRuntimeException(e);
-    }
-  }
-
-  private void readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
-    for (int i = 0; i < selectedColumnNames.size(); i++) {
-      final String columnName = selectedColumnNames.get(i);
-      Object hiveValue = finalOI.getStructFieldData(deSerializedValue, finalOI.getStructFieldRef(columnName));
-
-      if (hiveValue != null) {
-        selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
-            vectors.get(i), outputRecordIndex);
-      }
-    }
-  }
-
-  private void setValueCountAndPopulatePartitionVectors(int recordCount) {
-    for (ValueVector v : vectors) {
-      v.getMutator().setValueCount(recordCount);
-    }
-
-    if (partition != null) {
-      populatePartitionVectors(recordCount);
-    }
-  }
-
-  @Override
-  public void close() {
-    try {
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    } catch (Exception e) {
-      logger.warn("Failure while closing Hive Record reader.", e);
-    }
-  }
-
-  protected void populatePartitionVectors(int recordCount) {
-    for (int i = 0; i < pVectors.size(); i++) {
-      final ValueVector vector = pVectors.get(i);
-      final Object val = selectedPartitionValues.get(i);
-
-      AllocationHelper.allocateNew(vector, recordCount);
-
-      if (val != null) {
-        HiveUtilities.populateVector(vector, managedBuffer, val, 0, recordCount);
-      }
-
-      vector.getMutator().setValueCount(recordCount);
-    }
-  }
-
-  /**
-   * SkipRecordsInspector encapsulates logic to skip header and footer from file.
-   * Logic is applicable only for predefined in constructor file formats.
-   */
-  private class SkipRecordsInspector {
-
-    private final Set<Object> fileFormats;
-    private int headerCount;
-    private int footerCount;
-    private Queue<Object> footerBuffer;
-    // indicates if we continue reading the same file
-    private boolean continuance;
-    private int holderIndex;
-    private List<Object> valueHolder;
-    private int actualCount;
-    // actualCount without headerCount, used to determine holderIndex
-    private int tempCount;
-
-    private SkipRecordsInspector(Properties tableProperties, RecordReader reader) {
-      this.fileFormats = new HashSet<Object>(Arrays.asList(org.apache.hadoop.mapred.TextInputFormat.class.getName()));
-      this.headerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, 0);
-      this.footerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, 0);
-      this.footerBuffer = Lists.newLinkedList();
-      this.continuance = false;
-      this.holderIndex = -1;
-      this.valueHolder = initializeValueHolder(reader, footerCount);
-      this.actualCount = 0;
-      this.tempCount = 0;
-    }
-
-    private boolean doSkipHeader(int recordCount) {
-      return !continuance && recordCount < headerCount;
-    }
-
-    private void reset() {
-      tempCount = holderIndex + 1;
-      actualCount = 0;
-      if (!continuance) {
-        footerBuffer.clear();
-      }
-    }
-
-    private Object bufferAdd(Object value) throws SerDeException {
-      footerBuffer.add(value);
-      if (footerBuffer.size() <= footerCount) {
-        return null;
-      }
-      return footerBuffer.poll();
-    }
-
-    private Object getNextValue() {
-      holderIndex = tempCount % getHolderSize();
-      return valueHolder.get(holderIndex);
-    }
-
-    private int getHolderSize() {
-      return valueHolder.size();
-    }
-
-    private void updateContinuance() {
-      this.continuance = actualCount != 0;
-    }
-
-    private int incrementTempCount() {
-      return ++tempCount;
-    }
-
-    private int getActualCount() {
-      return actualCount;
-    }
-
-    private int incrementActualCount() {
-      return ++actualCount;
-    }
-
-    /**
-     * Retrieves positive numeric property from Properties object by name.
-     * Return default value if
-     * 1. file format is absent in predefined file formats list
-     * 2. property doesn't exist in table properties
-     * 3. property value is negative
-     * otherwise casts value to int.
-     *
-     * @param tableProperties property holder
-     * @param propertyName    name of the property
-     * @param defaultValue    default value
-     * @return property numeric value
-     * @throws NumberFormatException if property value is non-numeric
-     */
-    private int retrievePositiveIntProperty(Properties tableProperties, String propertyName, int defaultValue) {
-      int propertyIntValue = defaultValue;
-      if (!fileFormats.contains(tableProperties.get(hive_metastoreConstants.FILE_INPUT_FORMAT))) {
-        return propertyIntValue;
-      }
-      Object propertyObject = tableProperties.get(propertyName);
-      if (propertyObject != null) {
-        try {
-          propertyIntValue = Integer.valueOf((String) propertyObject);
-        } catch (NumberFormatException e) {
-          throw new NumberFormatException(String.format("Hive table property %s value '%s' is non-numeric", propertyName, propertyObject.toString()));
-        }
-      }
-      return propertyIntValue < 0 ? defaultValue : propertyIntValue;
-    }
-
-    /**
-     * Creates buffer of objects to be used as values, so these values can be re-used.
-     * Objects number depends on number of lines to skip in the end of the file plus one object.
-     *
-     * @param reader          RecordReader to return value object
-     * @param skipFooterLines number of lines to skip at the end of the file
-     * @return list of objects to be used as values
-     */
-    private List<Object> initializeValueHolder(RecordReader reader, int skipFooterLines) {
-      List<Object> valueHolder = new ArrayList<>(skipFooterLines + 1);
-      for (int i = 0; i <= skipFooterLines; i++) {
-        valueHolder.add(reader.createValue());
-      }
-      return valueHolder;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index eee7343..7aece71 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -17,7 +17,10 @@
  */
 package org.apache.drill.exec.store.hive;
 
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -29,14 +32,33 @@ import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 
 @SuppressWarnings("unused")
 public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
 
+  /**
+   * Use different classes for different Hive native formats:
+   * ORC, AVRO, RCFFile, Text and Parquet.
+   * If input format is none of them falls to default reader.
+   */
+  static Map<String, Class> readerMap = new HashMap<>();
+  static {
+    readerMap.put(OrcInputFormat.class.getCanonicalName(), HiveOrcReader.class);
+    readerMap.put(AvroContainerInputFormat.class.getCanonicalName(), HiveAvroReader.class);
+    readerMap.put(RCFileInputFormat.class.getCanonicalName(), HiveRCFileReader.class);
+    readerMap.put(MapredParquetInputFormat.class.getCanonicalName(), HiveParquetReader.class);
+    readerMap.put(TextInputFormat.class.getCanonicalName(), HiveTextReader.class);
+  }
+
   @Override
   public ScanBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
@@ -51,29 +73,27 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
 
     final HiveConf hiveConf = config.getHiveConf();
 
-    // Native hive text record reader doesn't handle all types currently. For now use HiveRecordReader which uses
-    // Hive InputFormat and SerDe classes to read the data.
-    //if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) &&
-    //        table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) &&
-    //        config.getColumns() != null) {
-    //  for (InputSplit split : splits) {
-    //    readers.add(new HiveTextRecordReader(table,
-    //        (hasPartitions ? partitions.get(i++) : null),
-    //        split, config.getColumns(), context));
-    //  }
-    //} else {
+    final String formatName = table.getSd().getInputFormat();
+    Class<? extends HiveAbstractReader> readerClass = HiveDefaultReader.class;
+    if (readerMap.containsKey(formatName)) {
+      readerClass = readerMap.get(formatName);
+    }
+    Constructor<? extends HiveAbstractReader> readerConstructor = null;
+    try {
+      readerConstructor = readerClass.getConstructor(Table.class, Partition.class,
+          InputSplit.class, List.class, FragmentContext.class, HiveConf.class,
+          UserGroupInformation.class);
       for (InputSplit split : splits) {
-        readers.add(new HiveRecordReader(table,
+        readers.add(readerConstructor.newInstance(table,
             (hasPartitions ? partitions.get(i++) : null), split, config.getColumns(), context, hiveConf, proxyUgi));
       }
-    //}
-
-    // If there are no readers created (which is possible when the table is empty), create an empty RecordReader to
-    // output the schema
-    if (readers.size() == 0) {
-      readers.add(new HiveRecordReader(table, null, null, config.getColumns(), context, hiveConf, proxyUgi));
+      if (readers.size() == 0) {
+        readers.add(readerConstructor.newInstance(
+            table, null, null, config.getColumns(), context, hiveConf, proxyUgi));
+      }
+    } catch(Exception e) {
+      logger.error("No constructor for {}, thrown {}", readerClass.getName(), e);
     }
-
     return new ScanBatch(config, context, readers.iterator());
   }
 }