You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/23 02:06:48 UTC

[5/8] git commit: DRILL-176: Updates to affinity calculator, fixes for parquet serialization. Fix to ErrorHelper looping

DRILL-176:  Updates to affinity calculator, fixes for parquet serialization.  Fix to ErrorHelper looping


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/7edd3617
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/7edd3617
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/7edd3617

Branch: refs/heads/master
Commit: 7edd36170a9be291a69e44f6090474193485bf14
Parents: d6ae53e
Author: Steven Phillips <sp...@maprtech.com>
Authored: Thu Aug 22 16:18:55 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 22 16:18:55 2013 -0700

----------------------------------------------------------------------
 .../drill/exec/planner/fragment/Wrapper.java    |   5 +-
 .../drill/exec/store/AffinityCalculator.java    |  91 ++++++----
 .../exec/store/parquet/ParquetGroupScan.java    | 177 +++++++++----------
 .../exec/store/parquet/ParquetRecordReader.java |   2 +-
 .../store/parquet/ParquetScanBatchCreator.java  |  10 +-
 .../drill/exec/work/foreman/ErrorHelper.java    |   8 +-
 .../exec/store/TestParquetPhysicalPlan.java     |  55 +++++-
 .../store/parquet/ParquetRecordReaderTest.java  |  52 +++++-
 .../parquet_scan_union_screen_physical.json     |   5 +-
 9 files changed, 257 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index d5a24b0..8c4b0b4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -151,15 +151,12 @@ public class Wrapper {
       for (int i = start; i < start + width; i++) {
         endpoints.add(all.get(i % div));
       }
-    } else if (values.size() < width) {
-      throw new NotImplementedException(
-          "Haven't implemented a scenario where we have some node affinity but the affinity list is smaller than the expected width.");
     } else {
       // get nodes with highest affinity.
       Collections.sort(values);
       values = Lists.reverse(values);
       for (int i = 0; i < width; i++) {
-        endpoints.add(values.get(i).getEndpoint());
+        endpoints.add(values.get(i%values.size()).getEndpoint());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
index b4092cc..b341ea4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
@@ -1,6 +1,7 @@
 package org.apache.drill.exec.store;
 
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableRangeMap;
 import com.google.common.collect.Range;
 import org.apache.drill.exec.store.parquet.ParquetGroupScan;
@@ -13,6 +14,7 @@ import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 public class AffinityCalculator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
@@ -24,6 +26,7 @@ public class AffinityCalculator {
   String fileName;
   Collection<DrillbitEndpoint> endpoints;
   HashMap<String,DrillbitEndpoint> endPointMap;
+  Stopwatch watch = new Stopwatch();
 
   public AffinityCalculator(String fileName, FileSystem fs, Collection<DrillbitEndpoint> endpoints) {
     this.fs = fs;
@@ -33,16 +36,20 @@ public class AffinityCalculator {
     buildEndpointMap();
   }
 
+  /**
+   * Builds a mapping of block locations to file byte range
+   */
   private void buildBlockMap() {
     try {
+      watch.start();
       FileStatus file = fs.getFileStatus(new Path(fileName));
-      long tC = System.nanoTime();
       blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
-      long tD = System.nanoTime();
+      watch.stop();
       logger.debug("Block locations: {}", blocks);
-      logger.debug("Took {} ms to get Block locations", (float)(tD - tC) / 1e6);
+      logger.debug("Took {} ms to get Block locations", watch.elapsed(TimeUnit.MILLISECONDS));
     } catch (IOException ioe) { throw new RuntimeException(ioe); }
-    long tA = System.nanoTime();
+    watch.reset();
+    watch.start();
     ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new ImmutableRangeMap.Builder<Long,BlockLocation>();
     for (BlockLocation block : blocks) {
       long start = block.getOffset();
@@ -51,62 +58,72 @@ public class AffinityCalculator {
       blockMapBuilder = blockMapBuilder.put(range, block);
     }
     blockMap = blockMapBuilder.build();
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to build block map", (float)(tB - tA) / 1e6);
+    watch.stop();
+    logger.debug("Took {} ms to build block map", watch.elapsed(TimeUnit.MILLISECONDS));
   }
   /**
+   * For a given RowGroup, calculate how many bytes are available on each on drillbit endpoint
    *
-   * @param entry
+   * @param rowGroup the RowGroup to calculate endpoint bytes for
    */
-  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) {
-    long tA = System.nanoTime();
+  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) {
+    watch.reset();
+    watch.start();
     HashMap<String,Long> hostMap = new HashMap<>();
-    long start = entry.getStart();
-    long end = start + entry.getLength();
-    Range<Long> entryRange = Range.closedOpen(start, end);
-    ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(entryRange);
-    for (Map.Entry<Range<Long>,BlockLocation> e : subRangeMap.asMapOfRanges().entrySet()) {
-      String[] hosts = null;
-      Range<Long> blockRange = e.getKey();
+    HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap();
+    long start = rowGroup.getStart();
+    long end = start + rowGroup.getLength();
+    Range<Long> rowGroupRange = Range.closedOpen(start, end);
+
+    // Find submap of ranges that intersect with the rowGroup
+    ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(rowGroupRange);
+
+    // Iterate through each block in this submap and get the host for the block location
+    for (Map.Entry<Range<Long>,BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) {
+      String[] hosts;
+      Range<Long> blockRange = block.getKey();
       try {
-        hosts = e.getValue().getHosts();
-      } catch (IOException ioe) { /*TODO Handle this exception */}
-      Range<Long> intersection = entryRange.intersection(blockRange);
+        hosts = block.getValue().getHosts();
+      } catch (IOException ioe) {
+        throw new RuntimeException("Failed to get hosts for block location", ioe);
+      }
+      Range<Long> intersection = rowGroupRange.intersection(blockRange);
       long bytes = intersection.upperEndpoint() - intersection.lowerEndpoint();
+
+      // For each host in the current block location, add the intersecting bytes to the corresponding endpoint
       for (String host : hosts) {
-        if (hostMap.containsKey(host)) {
-          hostMap.put(host, hostMap.get(host) + bytes);
+        DrillbitEndpoint endpoint = getDrillBitEndpoint(host);
+        if (endpointByteMap.containsKey(endpoint)) {
+          endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) + bytes);
         } else {
-          hostMap.put(host, bytes);
+          if (endpoint != null ) endpointByteMap.put(endpoint, bytes);
         }
       }
     }
-    HashMap<DrillbitEndpoint,Long> ebs = new HashMap();
-    try {
-      for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) {
-        String host = hostEntry.getKey();
-        Long bytes = hostEntry.getValue();
-        DrillbitEndpoint d = getDrillBitEndpoint(host);
-        if (d != null ) ebs.put(d, bytes);
-      }
-    } catch (NullPointerException n) {}
-    entry.setEndpointBytes(ebs);
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) / 1e6);
+
+    rowGroup.setEndpointBytes(endpointByteMap);
+    rowGroup.setMaxBytes(endpointByteMap.size() > 0 ? Collections.max(endpointByteMap.values()) : 0);
+    logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(), rowGroup.getStart(), rowGroup.getMaxBytes());
+    watch.stop();
+    logger.debug("Took {} ms to set endpoint bytes", watch.elapsed(TimeUnit.MILLISECONDS));
   }
 
   private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
     return endPointMap.get(hostName);
   }
 
+  /**
+   * Builds a mapping of drillbit endpoints to hostnames
+   */
   private void buildEndpointMap() {
-    long tA = System.nanoTime();
+    watch.reset();
+    watch.start();
     endPointMap = new HashMap<String, DrillbitEndpoint>();
     for (DrillbitEndpoint d : endpoints) {
       String hostName = d.getAddress();
       endPointMap.put(hostName, d);
     }
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) / 1e6);
+    watch.stop();
+    logger.debug("Took {} ms to build endpoint map", watch.elapsed(TimeUnit.MILLISECONDS));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 9e48d33..64ced87 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -18,14 +18,13 @@
 package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.exception.SetupException;
 import org.apache.drill.exec.physical.EndpointAffinity;
@@ -59,8 +58,9 @@ import com.google.common.base.Preconditions;
 public class ParquetGroupScan extends AbstractGroupScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
 
-  private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings;
+  private ArrayListMultimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> mappings;
   private List<RowGroupInfo> rowGroupInfos;
+  private Stopwatch watch = new Stopwatch();
 
   public List<ReadEntryWithPath> getEntries() {
     return entries;
@@ -110,16 +110,14 @@ public class ParquetGroupScan extends AbstractGroupScan {
   }
 
   private void readFooter() throws IOException {
-    long tA = System.nanoTime();
+    watch.reset();
+    watch.start();
     rowGroupInfos = new ArrayList();
     long start = 0, length = 0;
     ColumnChunkMetaData columnChunkMetaData;
     for (ReadEntryWithPath readEntryWithPath : entries){
       Path path = new Path(readEntryWithPath.getPath());
       ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
-//      FileSystem fs = FileSystem.get(this.storageEngine.getHadoopConfig());
-//      FileStatus status = fs.getFileStatus(path);
-//      ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status);
       readEntryWithPath.getPath();
 
       int i = 0;
@@ -138,38 +136,21 @@ public class ParquetGroupScan extends AbstractGroupScan {
         i++;
       }
     }
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to get row group infos", (float)(tB - tA) / 1E6);
+    watch.stop();
+    logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS));
   }
 
   private void calculateEndpointBytes() {
-    long tA = System.nanoTime();
+    watch.reset();
+    watch.start();
     AffinityCalculator ac = new AffinityCalculator(fileName, fs, availableEndpoints);
     for (RowGroupInfo e : rowGroupInfos) {
       ac.setEndpointBytes(e);
       totalBytes += e.getLength();
     }
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB - tA) / 1E6);
+    watch.stop();
+    logger.debug("Took {} ms to calculate EndpointBytes", watch.elapsed(TimeUnit.MILLISECONDS));
   }
-/*
-  public LinkedList<RowGroupInfo> getRowGroups() {
-    return rowGroups;
-  }
-
-  public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) {
-    this.rowGroups = rowGroups;
-  }
-
-  public static class ParquetFileReadEntry {
-
-    String path;
-
-    public ParquetFileReadEntry(@JsonProperty String path){
-      this.path = path;
-    }
-  }
-  */
 
   @JsonIgnore
   public FileSystem getFileSystem() {
@@ -232,16 +213,22 @@ public class ParquetGroupScan extends AbstractGroupScan {
     }
   }
 
+  /**
+   *Calculates the affinity each endpoint has for this scan, by adding up the affinity each endpoint has for each
+   * rowGroup
+   * @return a list of EndpointAffinity objects
+   */
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
-    long tA = System.nanoTime();
+    watch.reset();
+    watch.start();
     if (this.endpointAffinities == null) {
       HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
       for (RowGroupInfo entry : rowGroupInfos) {
         for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
           long bytes = entry.getEndpointBytes().get(d);
           float affinity = (float)bytes / (float)totalBytes;
-          logger.error("RowGroup: {} Endpoint: {} Bytes: {}", entry.getRowGroupIndex(), d.getAddress(), bytes);
+          logger.debug("RowGroup: {} Endpoint: {} Bytes: {}", entry.getRowGroupIndex(), d.getAddress(), bytes);
           if (affinities.keySet().contains(d)) {
             affinities.put(d, affinities.get(d) + affinity);
           } else {
@@ -256,83 +243,90 @@ public class ParquetGroupScan extends AbstractGroupScan {
       }
       this.endpointAffinities = affinityList;
     }
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to get operator affinity", (float)(tB - tA) / 1E6);
+    watch.stop();
+    logger.debug("Took {} ms to get operator affinity", watch.elapsed(TimeUnit.MILLISECONDS));
     return this.endpointAffinities;
   }
 
 
+  static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01};
 
-
+  /**
+   *
+   * @param incomingEndpoints
+   */
   @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
-    long tA = System.nanoTime();
-    Preconditions.checkArgument(endpoints.size() <= rowGroupInfos.size());
-
-    int i = 0;
-    for (DrillbitEndpoint endpoint : endpoints) {
-      logger.debug("Endpoint index {}, endpoint host: {}", i++, endpoint.getAddress());
+  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+    watch.reset();
+    watch.start();
+    Preconditions.checkArgument(incomingEndpoints.size() <= rowGroupInfos.size());
+    mappings = ArrayListMultimap.create();
+    ArrayList rowGroupList = new ArrayList(rowGroupInfos);
+    List<DrillbitEndpoint> endpointLinkedlist = Lists.newLinkedList(incomingEndpoints);
+    for(double cutoff : ASSIGNMENT_CUTOFFS ){
+      scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff, false);
     }
-
-    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
-    mappings = new LinkedList[endpoints.size()];
-    LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints, rowGroupInfos, 100, true, false);
-    LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints, unassigned, 50, true, false);
-    LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints, unassigned2, 25, true, false);
-    LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints, unassigned3, 0, false, false);
-    LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints, unassigned4, 0, false, true);
-    assert unassigned5.size() == 0 : String.format("All readEntries should be assigned by now, but some are still unassigned");
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) / 1E6);
+    scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true);
+    watch.stop();
+    logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS));
+    Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned");
+    Preconditions.checkArgument(!rowGroupInfos.isEmpty());
   }
 
-  private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean mustContain, boolean assignAll) {
-    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
-    LinkedList<RowGroupInfo> unassigned = new LinkedList<>();
-
-    int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() * 1.5);
+  public int fragmentPointer = 0;
+
+  /**
+   *
+   * @param endpointAssignments the mapping between fragment/endpoint and rowGroup
+   * @param endpoints the list of drillbits, ordered by the corresponding fragment
+   * @param rowGroups the list of rowGroups to assign
+   * @param requiredPercentage the percentage of max bytes required to make an assignment
+   * @param assignAll if true, will assign even if no affinity
+   */
+  private void scanAndAssign (Multimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments, List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double requiredPercentage, boolean assignAll) {
+    Collections.sort(rowGroups, new ParquetReadEntryComparator());
+    final boolean requireAffinity = requiredPercentage > 0;
+    int maxAssignments = (int) (rowGroups.size() / endpoints.size());
+
+    if (maxAssignments < 1) maxAssignments = 1;
+
+    for(Iterator<RowGroupInfo> iter = rowGroups.iterator(); iter.hasNext();){
+      RowGroupInfo rowGroupInfo = iter.next();
+      for (int i = 0; i < endpoints.size(); i++) {
+        int minorFragmentId = (fragmentPointer + i) % endpoints.size();
+        DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
+        Map<DrillbitEndpoint, Long> bytesPerEndpoint = rowGroupInfo.getEndpointBytes();
+        boolean haveAffinity = bytesPerEndpoint.containsKey(currentEndpoint) ;
 
-    if (maxEntries < 1) maxEntries = 1;
-
-    int i =0;
-    for(RowGroupInfo e : rowGroups) {
-      boolean assigned = false;
-      for (int j = i; j < i + endpoints.size(); j++) {
-        DrillbitEndpoint currentEndpoint = endpoints.get(j%endpoints.size());
         if (assignAll ||
-                (e.getEndpointBytes().size() > 0 &&
-                (e.getEndpointBytes().containsKey(currentEndpoint) || !mustContain) &&
-                (mappings[j%endpoints.size()] == null || mappings[j%endpoints.size()].size() < maxEntries) &&
-                e.getEndpointBytes().get(currentEndpoint) >= e.getMaxBytes() * requiredPercentage / 100)) {
-          LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries = mappings[j%endpoints.size()];
-          if(entries == null){
-            entries = new LinkedList<ParquetRowGroupScan.RowGroupReadEntry>();
-            mappings[j%endpoints.size()] = entries;
-          }
-          entries.add(e.getRowGroupReadEntry());
-          logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}", e.getPath(), e.getStart(), currentEndpoint.getAddress());
-          assigned = true;
+                (!bytesPerEndpoint.isEmpty() &&
+                        (!requireAffinity || haveAffinity) &&
+                        (!endpointAssignments.containsKey(minorFragmentId) || endpointAssignments.get(minorFragmentId).size() < maxAssignments) &&
+                        bytesPerEndpoint.get(currentEndpoint) >= rowGroupInfo.getMaxBytes() * requiredPercentage)) {
+
+          endpointAssignments.put(minorFragmentId, rowGroupInfo.getRowGroupReadEntry());
+          logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId, endpoints.get(minorFragmentId).getAddress());
+          iter.remove();
+          fragmentPointer = (minorFragmentId + 1) % endpoints.size();
           break;
         }
       }
-      if (!assigned) unassigned.add(e);
-      i++;
+
     }
-    return unassigned;
   }
 
   @Override
   public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
-    assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
-    for (ParquetRowGroupScan.RowGroupReadEntry rg : mappings[minorFragmentId]) {
+    assert minorFragmentId < mappings.size() : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(), minorFragmentId);
+    for (ParquetRowGroupScan.RowGroupReadEntry rg : mappings.get(minorFragmentId)) {
       logger.debug("minorFragmentId: {} Path: {} RowGroupIndex: {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
     }
+    Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(), String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
     try {
-      return new ParquetRowGroupScan(storageEngine, engineConfig, mappings[minorFragmentId]);
+      return new ParquetRowGroupScan(storageEngine, engineConfig, mappings.get(minorFragmentId));
     } catch (SetupException e) {
-      e.printStackTrace(); // TODO - fix this
+      throw new RuntimeException("Error setting up ParquetRowGroupScan", e);
     }
-    return null;
   }
 
   @Override
@@ -342,7 +336,8 @@ public class ParquetGroupScan extends AbstractGroupScan {
 
   @Override
   public OperatorCost getCost() {
-    return new OperatorCost(1,1,1,1);
+    //TODO Figure out how to properly calculate cost
+    return new OperatorCost(1,rowGroupInfos.size(),1,1);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 4e46034..3aaa987 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -211,8 +211,8 @@ public class ParquetRecordReader implements RecordReader {
       }
       for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns) {
         output.addField(r.valueVecHolder.getValueVector());
-        output.setNewSchema();
       }
+      output.setNewSchema();
     }catch(SchemaChangeException e) {
       throw new ExecutionSetupException("Error setting up output mutator.", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 03fb4ec..addd288 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Stopwatch;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -40,12 +42,12 @@ import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.metadata.ParquetMetadata;
 
 public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
 
   @Override
   public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
-    long tA = System.nanoTime(), tB;
-    System.out.println( new SimpleDateFormat("mm:ss S").format(new Date()) + " :Start of ScanBatCreator.scanBatch");
+    Stopwatch watch = new Stopwatch();
+    watch.start();
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();
     for(ParquetRowGroupScan.RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
@@ -68,7 +70,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
         throw new ExecutionSetupException(e1);
       }
     }
-    System.out.println( "Total time in method: " + ((float) (System.nanoTime() - tA) / 1e9));
+    logger.debug("total time in ScanBatchCreator.getBatch: {} ms", watch.elapsed(TimeUnit.MILLISECONDS));
     return new ScanBatch(context, readers.iterator());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
index 9a33109..72c5f34 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
@@ -35,8 +35,8 @@ public class ErrorHelper {
     if(message != null){
       sb.append(message);
     }
-      
-    do{
+
+    while (true) {
       sb.append(" < ");
       sb.append(t.getClass().getSimpleName());
       if(t.getMessage() != null){
@@ -44,7 +44,9 @@ public class ErrorHelper {
         sb.append(t.getMessage());
         sb.append(" ]");
       }
-    }while(t.getCause() != null && t.getCause() != t);
+      if (t.getCause() == null || t.getCause() == t) break;
+      t = t.getCause();
+    }
     
     builder.setMessage(sb.toString());
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
index e2a00f1..18ac294 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
@@ -13,10 +13,16 @@ import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -29,6 +35,7 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 import static junit.framework.Assert.assertNull;
 import static org.junit.Assert.assertEquals;
@@ -38,6 +45,8 @@ public class TestParquetPhysicalPlan {
 
   //public String fileName = "/physical_test2.json";
   public String fileName = "parquet_scan_union_screen_physical.json";
+//  public String fileName = "parquet-sample.json";
+
 
   @Test
   @Ignore
@@ -49,7 +58,51 @@ public class TestParquetPhysicalPlan {
       bit1.run();
       client.connect();
       List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
-      System.out.println(String.format("Got %d results", results.size()));
+      RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
+      for (QueryResultBatch b : results) {
+        System.out.println(String.format("Got %d results", b.getHeader().getRowCount()));
+        loader.load(b.getHeader().getDef(), b.getData());
+        for (VectorWrapper vw : loader) {
+          System.out.println(vw.getValueVector().getField().getName());
+          ValueVector vv = vw.getValueVector();
+          for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
+            Object o = vv.getAccessor().getObject(i);
+            System.out.println(vv.getAccessor().getObject(i));
+          }
+        }
+      }
+      client.close();
+    }
+  }
+
+  private class ParquetResultsListener implements UserResultsListener {
+    private CountDownLatch latch = new CountDownLatch(1);
+    @Override
+    public void submissionFailed(RpcException ex) {
+      logger.error("submission failed", ex);
+      latch.countDown();
+    }
+
+    @Override
+    public void resultArrived(QueryResultBatch result) {
+      System.out.printf("Result batch arrived. Number of records: %d", result.getHeader().getRowCount());
+      if (result.getHeader().getIsLastChunk()) latch.countDown();
+    }
+
+    public void await() throws Exception {
+      latch.await();
+    }
+  }
+  @Test
+  @Ignore
+  public void testParseParquetPhysicalPlanRemote() throws Exception {
+    DrillConfig config = DrillConfig.create();
+
+    try(DrillClient client = new DrillClient(config);){
+      client.connect();
+      ParquetResultsListener listener = new ParquetResultsListener();
+      client.runQuery(UserProtos.QueryType.PHYSICAL, Resources.toString(Resources.getResource(fileName),Charsets.UTF_8), listener);
+      listener.await();
       client.close();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 1d91455..7a99c3f 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -48,6 +48,7 @@ import org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import parquet.bytes.BytesInput;
@@ -68,6 +69,7 @@ public class ParquetRecordReaderTest {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
 
   private boolean VERBOSE_DEBUG = false;
+  private boolean checkValues = true;
 
   static final int numberRowGroups = 20;
   static final int recordsPerRowGroup = 300000;
@@ -100,6 +102,20 @@ public class ParquetRecordReaderTest {
     testParquetFullEngineLocalText(planText, fileName, i, numberRowGroups, recordsPerRowGroup);
   }
 
+  @Test
+  @Ignore
+  public void testLocalDistributed() throws Exception {
+    String planName = "/parquet/parquet_scan_union_screen_physical.json";
+    testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20, 300000);
+  }
+
+  @Test
+  @Ignore
+  public void testRemoteDistributed() throws Exception {
+    String planName = "/parquet/parquet_scan_union_screen_physical.json";
+    testParquetFullEngineRemote(planName, fileName, 1, 10, 30000);
+  }
+
 
   private class ParquetResultListener implements UserResultsListener {
     private SettableFuture<Void> future = SettableFuture.create();
@@ -155,8 +171,12 @@ public class ParquetRecordReaderTest {
           if (VERBOSE_DEBUG){
             System.out.print(vv.getAccessor().getObject(j) + ", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
           }
-          assertField(vv, j, (TypeProtos.MinorType) currentField.type,
-              currentField.values[(int) (columnValCounter % 3)], (String) currentField.name + "/");
+          if (checkValues) {
+            try {
+              assertField(vv, j, (TypeProtos.MinorType) currentField.type,
+                currentField.values[(int) (columnValCounter % 3)], (String) currentField.name + "/");
+            } catch (AssertionError e) { submissionFailed(new RpcException(e)); }
+          }
           columnValCounter++;
         }
         if (VERBOSE_DEBUG){
@@ -197,7 +217,9 @@ public class ParquetRecordReaderTest {
       batchCounter++;
       if(result.getHeader().getIsLastChunk()){
         for (String s : valuesChecked.keySet()) {
+          try {
           assertEquals("Record count incorrect for column: " + s, totalRecords, (long) valuesChecked.get(s));
+          } catch (AssertionError e) { submissionFailed(new RpcException(e)); }
         }
         
         assert valuesChecked.keySet().size() > 0;
@@ -222,11 +244,13 @@ public class ParquetRecordReaderTest {
     
     DrillConfig config = DrillConfig.create();
 
+    checkValues = false;
+
     try(DrillClient client = new DrillClient(config);){
       client.connect();
       RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator());
       ParquetResultListener resultListener = new ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, numberOfTimesRead);
-      client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener);
+      client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener);
       resultListener.get();
     }
     
@@ -259,6 +283,28 @@ public class ParquetRecordReaderTest {
   }
 
 
+  //use this method to submit physical plan
+  public void testParquetFullEngineLocalTextDistributed(String planName, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{
+
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    checkValues = false;
+
+    DrillConfig config = DrillConfig.create();
+
+    try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());){
+      bit1.run();
+      client.connect();
+      RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator());
+      ParquetResultListener resultListener = new ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, numberOfTimesRead);
+      Stopwatch watch = new Stopwatch().start();
+      client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8), resultListener);
+      resultListener.get();
+      System.out.println(String.format("Took %d ms to run query", watch.elapsed(TimeUnit.MILLISECONDS)));
+
+    }
+
+  }
 
   public String pad(String value, int length) {
     return pad(value, length, " ");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
index f508d09..5efecaf 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
@@ -11,10 +11,7 @@
     @id : 1,
     entries : [
     {
-        path : "/tmp/testParquetFile_many_types_3"
-    },
-    {
-        path : "/tmp/testParquetFile_many_types_3"
+        path : "/tmp/parquet_test_file_many_types"
     }
     ],
     storageengine:{


Re: [5/8] git commit: DRILL-176: Updates to affinity calculator, fixes for parquet serialization. Fix to ErrorHelper looping

Posted by Tanujit Ghosh <ta...@gmail.com>.
A more detailed output, i just ran the purticular test this time

[tanujit@legolas java-exec]$ mvn -Dtest=ParquetRecordReaderTest test
[INFO] Scanning for projects...
[INFO]

[INFO]
------------------------------------------------------------------------
[INFO] Building java-exec 1.0-SNAPSHOT
[INFO]
------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:copy-resources (copy-resources) @
java-exec ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 369 resources
[INFO]
[INFO] --- maven-enforcer-plugin:1.2:enforce (no_commons_logging) @
java-exec ---
[INFO]
[INFO] --- maven-antrun-plugin:1.6:run (generate-sources) @ java-exec ---
[WARNING] Parameter tasks is deprecated, use target instead
[INFO] Executing tasks

main:
[INFO] Executed tasks
[INFO] Registering compile source root
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/generated-sources
[INFO]
[INFO] --- fmpp-maven-plugin:1.0:generate (generate-sources) @ java-exec ---
- Executing: ValueHolders.java
log4j:WARN No appenders could be found for logger (freemarker.cache).
log4j:WARN Please initialize the log4j system properly.
- Executing: VariableLengthVectors.java
- Executing: FixedValueVectors.java
- Executing: TypeHelper.java
- Executing: NullableValueVectors.java
- Executing: RepeatedValueVectors.java
[INFO] Done
[INFO]
[INFO] --- maven-remote-resources-plugin:1.1:process (default) @ java-exec
---
[INFO] Setting property: classpath.resource.loader.class =>
'org.codehaus.plexus.velocity.ContextClassLoaderResourceLoader'.
[INFO] Setting property: velocimacro.messages.on => 'false'.
[INFO] Setting property: resource.loader => 'classpath'.
[INFO] Setting property: resource.manager.logwhenfound => 'false'.
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @
java-exec ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.0:compile (default-compile) @ java-exec
---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 459 source files to
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/classes
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java:[20,21]
com.sun.corba.se.impl.interceptors.CodecFactoryImpl is internal proprietary
API and may be removed in a future release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java:
Some input files use or override a deprecated API.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java:
Recompile with -Xlint:deprecation for details.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java:
Some input files use unchecked or unsafe operations.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java:
Recompile with -Xlint:unchecked for details.
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources)
@ java-exec ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 36 resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.0:testCompile (default-testCompile) @
java-exec ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 126 source files to
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/test-classes
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[25,16]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[53,17]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[94,12]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[95,14]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[98,26]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[100,27]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java:
Some input files use or override a deprecated API.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java:
Recompile with -Xlint:deprecation for details.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java:
Some input files use unchecked or unsafe operations.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java:
Recompile with -Xlint:unchecked for details.
[INFO]
[INFO] --- maven-surefire-plugin:2.15:test (default-test) @ java-exec ---
[INFO] Surefire report directory:
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/surefire-reports

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running org.apache.drill.exec.store.parquet.ParquetRecordReaderTest
08:18:15.598 [main] DEBUG org.reflections.Reflections - going to scan these
urls:
jar:file:/home/tanujit/.m2/repository/org/apache/drill/common/1.0-SNAPSHOT/common-1.0-SNAPSHOT.jar!/
file:/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/test-classes/
file:/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/classes/
jar:file:/home/tanujit/.m2/repository/org/apache/drill/common/1.0-SNAPSHOT/common-1.0-SNAPSHOT-tests.jar!/

08:18:16.124 [main] INFO  org.reflections.Reflections - Reflections took
524 ms to scan 4 urls, producing 667 keys and 1620 values
08:18:16.148 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as
the default logging framework
08:18:16.150 [main] DEBUG i.n.c.MultithreadEventLoopGroup -
-Dio.netty.eventLoopThreads: 8
08:18:16.165 [main] DEBUG i.n.util.internal.PlatformDependent - UID: 1000
08:18:16.165 [main] DEBUG i.n.util.internal.PlatformDependent - Java
version: 7
08:18:16.166 [main] DEBUG i.n.util.internal.PlatformDependent -
-Dio.netty.noUnsafe: false
08:18:16.167 [main] DEBUG i.n.util.internal.PlatformDependent0 -
java.nio.ByteBuffer.cleaner: available
08:18:16.167 [main] DEBUG i.n.util.internal.PlatformDependent0 -
java.nio.Buffer.address: available
08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent0 -
sun.misc.Unsafe.theUnsafe: available
08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent0 -
sun.misc.Unsafe.copyMemory: available
08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent0 -
java.nio.Bits.unaligned: true
08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent -
sun.misc.Unsafe: available
08:18:16.169 [main] DEBUG i.n.util.internal.PlatformDependent -
-Dio.netty.noJavassist: false
08:18:16.227 [main] DEBUG i.n.util.internal.PlatformDependent - Javassist:
available
08:18:16.228 [main] DEBUG i.n.util.internal.PlatformDependent -
-Dio.netty.noPreferDirect: false
08:18:16.249 [main] DEBUG io.netty.channel.nio.NioEventLoop -
-Dio.netty.noKeySetOptimization: false
08:18:16.249 [main] DEBUG io.netty.channel.nio.NioEventLoop -
-Dio.netty.selectorAutoRebuildThreshold: 512
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.numHeapArenas: 4
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.numDirectArenas: 4
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.pageSize: 8192
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.maxOrder: 11
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.chunkSize: 16777216
08:18:16.335 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo
08:18:16.335 [main] DEBUG io.netty.util.NetUtil - Loopback address:
/0:0:0:0:0:0:0:1%1 (primary)
08:18:16.335 [main] DEBUG io.netty.util.NetUtil - Loopback address: /
127.0.0.1
08:18:16.335 [main] DEBUG io.netty.util.NetUtil -
/proc/sys/net/core/somaxconn: 128
08:18:16.344 [UserServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x30e3b962] REGISTERED
08:18:16.347 [UserServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x30e3b962] BIND(0.0.0.0/0.0.0.0:31010)
08:18:16.350 [UserServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x30e3b962, /0:0:0:0:0:0:0:0:31010] ACTIVE
08:18:16.366 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b] REGISTERED
08:18:16.367 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b] BIND(0.0.0.0/0.0.0.0:31011)
08:18:16.367 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] ACTIVE
08:18:16.571 [Client-1] DEBUG
i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated:
io.netty.util.internal.__matchers__.io.netty.buffer.ByteBufMatcher
08:18:16.574 [Client-1] DEBUG
i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated:
io.netty.util.internal.__matchers__.org.apache.drill.exec.rpc.OutboundRpcMessageMatcher
08:18:16.576 [Client-1] DEBUG
i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated:
io.netty.util.internal.__matchers__.org.apache.drill.exec.rpc.InboundRpcMessageMatcher
08:18:16.579 [UserServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x30e3b962, /0:0:0:0:0:0:0:0:31010] RECEIVED: [id: 0xf33275f9, /
192.168.0.102:34995 => /192.168.0.102:31010]
08:18:16.589 [Client-1] DEBUG io.netty.util.ResourceLeakDetector -
-Dio.netty.noResourceLeakDetection: false
08:18:16.770 [WorkManager-1] DEBUG org.apache.hadoop.security.Groups -
 Creating new Groups object
08:18:16.817 [WorkManager-1] DEBUG org.apache.hadoop.security.Groups -
Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
cacheTimeout=300000
08:18:16.882 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation -
hadoop login
08:18:16.882 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation -
hadoop login commit
08:18:16.898 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation -
using local user:UnixPrincipal: tanujit
08:18:16.900 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation -
UGI loginUser:tanujit
08:18:16.940 [WorkManager-1] DEBUG org.apache.hadoop.fs.FileSystem -
Creating filesystem for file:///
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
08:18:17.922 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x1f148baa, /
192.168.0.102:45778 => /192.168.0.102:31011]
08:18:17.923 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xc4ed7542, /
192.168.0.102:45779 => /192.168.0.102:31011]
08:18:17.937 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x70a4a44c, /
192.168.0.102:45780 => /192.168.0.102:31011]
08:18:17.937 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xf6360725, /
192.168.0.102:45781 => /192.168.0.102:31011]
08:18:17.937 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x51467583, /
192.168.0.102:45782 => /192.168.0.102:31011]
08:18:17.938 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xaf6d72df, /
192.168.0.102:45783 => /192.168.0.102:31011]
08:18:17.938 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x6a8a1670, /
192.168.0.102:45784 => /192.168.0.102:31011]
08:18:17.938 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xf18205a3, /
192.168.0.102:45785 => /192.168.0.102:31011]
08:18:17.939 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xa09abc2c, /
192.168.0.102:45786 => /192.168.0.102:31011]
08:18:17.943 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x3784f526, /
192.168.0.102:45787 => /192.168.0.102:31011]





On Sat, Aug 24, 2013 at 8:15 AM, Tanujit Ghosh <ta...@gmail.com>wrote:

> Hi,
>
> when i try mvn install after these changes the
> org.apache.drill.exec.store.parquet.ParquetRecordReaderTest is hanging.
>
> Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.596 sec
> - in org.apache.drill.exec.expr.ExpressionTest
> Running org.apache.drill.exec.store.TestAffinityCalculator
> Took 0.616287 ms to build range map
> Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.137 sec
> - in org.apache.drill.exec.store.TestAffinityCalculator
> Running org.apache.drill.exec.store.parquet.ParquetRecordReaderTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
>
> Environment is Fedora 18, open jdk 1.7
>
> with skipTests everything is getting compiled fine.
>
> Regards
> Tanujit
>
>
>
> On Fri, Aug 23, 2013 at 5:36 AM, <ja...@apache.org> wrote:
>
>> DRILL-176:  Updates to affinity calculator, fixes for parquet
>> serialization.  Fix to ErrorHelper looping
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
>> Commit:
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/7edd3617
>> Tree:
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/7edd3617
>> Diff:
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/7edd3617
>>
>> Branch: refs/heads/master
>> Commit: 7edd36170a9be291a69e44f6090474193485bf14
>> Parents: d6ae53e
>> Author: Steven Phillips <sp...@maprtech.com>
>> Authored: Thu Aug 22 16:18:55 2013 -0700
>> Committer: Jacques Nadeau <ja...@apache.org>
>> Committed: Thu Aug 22 16:18:55 2013 -0700
>>
>> ----------------------------------------------------------------------
>>  .../drill/exec/planner/fragment/Wrapper.java    |   5 +-
>>  .../drill/exec/store/AffinityCalculator.java    |  91 ++++++----
>>  .../exec/store/parquet/ParquetGroupScan.java    | 177 +++++++++----------
>>  .../exec/store/parquet/ParquetRecordReader.java |   2 +-
>>  .../store/parquet/ParquetScanBatchCreator.java  |  10 +-
>>  .../drill/exec/work/foreman/ErrorHelper.java    |   8 +-
>>  .../exec/store/TestParquetPhysicalPlan.java     |  55 +++++-
>>  .../store/parquet/ParquetRecordReaderTest.java  |  52 +++++-
>>  .../parquet_scan_union_screen_physical.json     |   5 +-
>>  9 files changed, 257 insertions(+), 148 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> index d5a24b0..8c4b0b4 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> @@ -151,15 +151,12 @@ public class Wrapper {
>>        for (int i = start; i < start + width; i++) {
>>          endpoints.add(all.get(i % div));
>>        }
>> -    } else if (values.size() < width) {
>> -      throw new NotImplementedException(
>> -          "Haven't implemented a scenario where we have some node
>> affinity but the affinity list is smaller than the expected width.");
>>      } else {
>>        // get nodes with highest affinity.
>>        Collections.sort(values);
>>        values = Lists.reverse(values);
>>        for (int i = 0; i < width; i++) {
>> -        endpoints.add(values.get(i).getEndpoint());
>> +        endpoints.add(values.get(i%values.size()).getEndpoint());
>>        }
>>      }
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> index b4092cc..b341ea4 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> @@ -1,6 +1,7 @@
>>  package org.apache.drill.exec.store;
>>
>>
>> +import com.google.common.base.Stopwatch;
>>  import com.google.common.collect.ImmutableRangeMap;
>>  import com.google.common.collect.Range;
>>  import org.apache.drill.exec.store.parquet.ParquetGroupScan;
>> @@ -13,6 +14,7 @@ import org.apache.hadoop.fs.Path;
>>
>>  import java.io.IOException;
>>  import java.util.*;
>> +import java.util.concurrent.TimeUnit;
>>
>>  public class AffinityCalculator {
>>    static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
>> @@ -24,6 +26,7 @@ public class AffinityCalculator {
>>    String fileName;
>>    Collection<DrillbitEndpoint> endpoints;
>>    HashMap<String,DrillbitEndpoint> endPointMap;
>> +  Stopwatch watch = new Stopwatch();
>>
>>    public AffinityCalculator(String fileName, FileSystem fs,
>> Collection<DrillbitEndpoint> endpoints) {
>>      this.fs = fs;
>> @@ -33,16 +36,20 @@ public class AffinityCalculator {
>>      buildEndpointMap();
>>    }
>>
>> +  /**
>> +   * Builds a mapping of block locations to file byte range
>> +   */
>>    private void buildBlockMap() {
>>      try {
>> +      watch.start();
>>        FileStatus file = fs.getFileStatus(new Path(fileName));
>> -      long tC = System.nanoTime();
>>        blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
>> -      long tD = System.nanoTime();
>> +      watch.stop();
>>        logger.debug("Block locations: {}", blocks);
>> -      logger.debug("Took {} ms to get Block locations", (float)(tD - tC)
>> / 1e6);
>> +      logger.debug("Took {} ms to get Block locations",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>      } catch (IOException ioe) { throw new RuntimeException(ioe); }
>> -    long tA = System.nanoTime();
>> +    watch.reset();
>> +    watch.start();
>>      ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new
>> ImmutableRangeMap.Builder<Long,BlockLocation>();
>>      for (BlockLocation block : blocks) {
>>        long start = block.getOffset();
>> @@ -51,62 +58,72 @@ public class AffinityCalculator {
>>        blockMapBuilder = blockMapBuilder.put(range, block);
>>      }
>>      blockMap = blockMapBuilder.build();
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to build block map", (float)(tB - tA) /
>> 1e6);
>> +    watch.stop();
>> +    logger.debug("Took {} ms to build block map",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>    }
>>    /**
>> +   * For a given RowGroup, calculate how many bytes are available on
>> each on drillbit endpoint
>>     *
>> -   * @param entry
>> +   * @param rowGroup the RowGroup to calculate endpoint bytes for
>>     */
>> -  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) {
>> -    long tA = System.nanoTime();
>> +  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) {
>> +    watch.reset();
>> +    watch.start();
>>      HashMap<String,Long> hostMap = new HashMap<>();
>> -    long start = entry.getStart();
>> -    long end = start + entry.getLength();
>> -    Range<Long> entryRange = Range.closedOpen(start, end);
>> -    ImmutableRangeMap<Long,BlockLocation> subRangeMap =
>> blockMap.subRangeMap(entryRange);
>> -    for (Map.Entry<Range<Long>,BlockLocation> e :
>> subRangeMap.asMapOfRanges().entrySet()) {
>> -      String[] hosts = null;
>> -      Range<Long> blockRange = e.getKey();
>> +    HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap();
>> +    long start = rowGroup.getStart();
>> +    long end = start + rowGroup.getLength();
>> +    Range<Long> rowGroupRange = Range.closedOpen(start, end);
>> +
>> +    // Find submap of ranges that intersect with the rowGroup
>> +    ImmutableRangeMap<Long,BlockLocation> subRangeMap =
>> blockMap.subRangeMap(rowGroupRange);
>> +
>> +    // Iterate through each block in this submap and get the host for
>> the block location
>> +    for (Map.Entry<Range<Long>,BlockLocation> block :
>> subRangeMap.asMapOfRanges().entrySet()) {
>> +      String[] hosts;
>> +      Range<Long> blockRange = block.getKey();
>>        try {
>> -        hosts = e.getValue().getHosts();
>> -      } catch (IOException ioe) { /*TODO Handle this exception */}
>> -      Range<Long> intersection = entryRange.intersection(blockRange);
>> +        hosts = block.getValue().getHosts();
>> +      } catch (IOException ioe) {
>> +        throw new RuntimeException("Failed to get hosts for block
>> location", ioe);
>> +      }
>> +      Range<Long> intersection = rowGroupRange.intersection(blockRange);
>>        long bytes = intersection.upperEndpoint() -
>> intersection.lowerEndpoint();
>> +
>> +      // For each host in the current block location, add the
>> intersecting bytes to the corresponding endpoint
>>        for (String host : hosts) {
>> -        if (hostMap.containsKey(host)) {
>> -          hostMap.put(host, hostMap.get(host) + bytes);
>> +        DrillbitEndpoint endpoint = getDrillBitEndpoint(host);
>> +        if (endpointByteMap.containsKey(endpoint)) {
>> +          endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) +
>> bytes);
>>          } else {
>> -          hostMap.put(host, bytes);
>> +          if (endpoint != null ) endpointByteMap.put(endpoint, bytes);
>>          }
>>        }
>>      }
>> -    HashMap<DrillbitEndpoint,Long> ebs = new HashMap();
>> -    try {
>> -      for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) {
>> -        String host = hostEntry.getKey();
>> -        Long bytes = hostEntry.getValue();
>> -        DrillbitEndpoint d = getDrillBitEndpoint(host);
>> -        if (d != null ) ebs.put(d, bytes);
>> -      }
>> -    } catch (NullPointerException n) {}
>> -    entry.setEndpointBytes(ebs);
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) /
>> 1e6);
>> +
>> +    rowGroup.setEndpointBytes(endpointByteMap);
>> +    rowGroup.setMaxBytes(endpointByteMap.size() > 0 ?
>> Collections.max(endpointByteMap.values()) : 0);
>> +    logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(),
>> rowGroup.getStart(), rowGroup.getMaxBytes());
>> +    watch.stop();
>> +    logger.debug("Took {} ms to set endpoint bytes",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>    }
>>
>>    private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
>>      return endPointMap.get(hostName);
>>    }
>>
>> +  /**
>> +   * Builds a mapping of drillbit endpoints to hostnames
>> +   */
>>    private void buildEndpointMap() {
>> -    long tA = System.nanoTime();
>> +    watch.reset();
>> +    watch.start();
>>      endPointMap = new HashMap<String, DrillbitEndpoint>();
>>      for (DrillbitEndpoint d : endpoints) {
>>        String hostName = d.getAddress();
>>        endPointMap.put(hostName, d);
>>      }
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) /
>> 1e6);
>> +    watch.stop();
>> +    logger.debug("Took {} ms to build endpoint map",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>    }
>>  }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> index 9e48d33..64ced87 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> @@ -18,14 +18,13 @@
>>  package org.apache.drill.exec.store.parquet;
>>
>>  import java.io.IOException;
>> -import java.util.ArrayList;
>> -import java.util.Collection;
>> -import java.util.Collections;
>> -import java.util.Comparator;
>> -import java.util.HashMap;
>> -import java.util.LinkedList;
>> -import java.util.List;
>> +import java.util.*;
>> +import java.util.concurrent.TimeUnit;
>>
>> +import com.google.common.base.Stopwatch;
>> +import com.google.common.collect.ArrayListMultimap;
>> +import com.google.common.collect.Lists;
>> +import com.google.common.collect.Multimap;
>>  import org.apache.drill.common.config.DrillConfig;
>>  import org.apache.drill.exec.exception.SetupException;
>>  import org.apache.drill.exec.physical.EndpointAffinity;
>> @@ -59,8 +58,9 @@ import com.google.common.base.Preconditions;
>>  public class ParquetGroupScan extends AbstractGroupScan {
>>    static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
>>
>> -  private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings;
>> +  private ArrayListMultimap<Integer,
>> ParquetRowGroupScan.RowGroupReadEntry> mappings;
>>    private List<RowGroupInfo> rowGroupInfos;
>> +  private Stopwatch watch = new Stopwatch();
>>
>>    public List<ReadEntryWithPath> getEntries() {
>>      return entries;
>> @@ -110,16 +110,14 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>>    }
>>
>>    private void readFooter() throws IOException {
>> -    long tA = System.nanoTime();
>> +    watch.reset();
>> +    watch.start();
>>      rowGroupInfos = new ArrayList();
>>      long start = 0, length = 0;
>>      ColumnChunkMetaData columnChunkMetaData;
>>      for (ReadEntryWithPath readEntryWithPath : entries){
>>        Path path = new Path(readEntryWithPath.getPath());
>>        ParquetMetadata footer =
>> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
>> -//      FileSystem fs =
>> FileSystem.get(this.storageEngine.getHadoopConfig());
>> -//      FileStatus status = fs.getFileStatus(path);
>> -//      ParquetMetadata footer =
>> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status);
>>        readEntryWithPath.getPath();
>>
>>        int i = 0;
>> @@ -138,38 +136,21 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>>          i++;
>>        }
>>      }
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to get row group infos", (float)(tB - tA) /
>> 1E6);
>> +    watch.stop();
>> +    logger.debug("Took {} ms to get row group infos",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>    }
>>
>>    private void calculateEndpointBytes() {
>> -    long tA = System.nanoTime();
>> +    watch.reset();
>> +    watch.start();
>>      AffinityCalculator ac = new AffinityCalculator(fileName, fs,
>> availableEndpoints);
>>      for (RowGroupInfo e : rowGroupInfos) {
>>        ac.setEndpointBytes(e);
>>        totalBytes += e.getLength();
>>      }
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB -
>> tA) / 1E6);
>> +    watch.stop();
>> +    logger.debug("Took {} ms to calculate EndpointBytes",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>    }
>> -/*
>> -  public LinkedList<RowGroupInfo> getRowGroups() {
>> -    return rowGroups;
>> -  }
>> -
>> -  public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) {
>> -    this.rowGroups = rowGroups;
>> -  }
>> -
>> -  public static class ParquetFileReadEntry {
>> -
>> -    String path;
>> -
>> -    public ParquetFileReadEntry(@JsonProperty String path){
>> -      this.path = path;
>> -    }
>> -  }
>> -  */
>>
>>    @JsonIgnore
>>    public FileSystem getFileSystem() {
>> @@ -232,16 +213,22 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>>      }
>>    }
>>
>> +  /**
>> +   *Calculates the affinity each endpoint has for this scan, by adding
>> up the affinity each endpoint has for each
>> +   * rowGroup
>> +   * @return a list of EndpointAffinity objects
>> +   */
>>    @Override
>>    public List<EndpointAffinity> getOperatorAffinity() {
>> -    long tA = System.nanoTime();
>> +    watch.reset();
>> +    watch.start();
>>      if (this.endpointAffinities == null) {
>>        HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
>>        for (RowGroupInfo entry : rowGroupInfos) {
>>          for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
>>            long bytes = entry.getEndpointBytes().get(d);
>>            float affinity = (float)bytes / (float)totalBytes;
>> -          logger.error("RowGroup: {} Endpoint: {} Bytes: {}",
>> entry.getRowGroupIndex(), d.getAddress(), bytes);
>> +          logger.debug("RowGroup: {} Endpoint: {} Bytes: {}",
>> entry.getRowGroupIndex(), d.getAddress(), bytes);
>>            if (affinities.keySet().contains(d)) {
>>              affinities.put(d, affinities.get(d) + affinity);
>>            } else {
>> @@ -256,83 +243,90 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>>        }
>>        this.endpointAffinities = affinityList;
>>      }
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to get operator affinity", (float)(tB - tA)
>> / 1E6);
>> +    watch.stop();
>> +    logger.debug("Took {} ms to get operator affinity",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>      return this.endpointAffinities;
>>    }
>>
>>
>> +  static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01};
>>
>> -
>> +  /**
>> +   *
>> +   * @param incomingEndpoints
>> +   */
>>    @Override
>> -  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
>> -    long tA = System.nanoTime();
>> -    Preconditions.checkArgument(endpoints.size() <=
>> rowGroupInfos.size());
>> -
>> -    int i = 0;
>> -    for (DrillbitEndpoint endpoint : endpoints) {
>> -      logger.debug("Endpoint index {}, endpoint host: {}", i++,
>> endpoint.getAddress());
>> +  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints)
>> {
>> +    watch.reset();
>> +    watch.start();
>> +    Preconditions.checkArgument(incomingEndpoints.size() <=
>> rowGroupInfos.size());
>> +    mappings = ArrayListMultimap.create();
>> +    ArrayList rowGroupList = new ArrayList(rowGroupInfos);
>> +    List<DrillbitEndpoint> endpointLinkedlist =
>> Lists.newLinkedList(incomingEndpoints);
>> +    for(double cutoff : ASSIGNMENT_CUTOFFS ){
>> +      scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff,
>> false);
>>      }
>> -
>> -    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
>> -    mappings = new LinkedList[endpoints.size()];
>> -    LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints,
>> rowGroupInfos, 100, true, false);
>> -    LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints,
>> unassigned, 50, true, false);
>> -    LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints,
>> unassigned2, 25, true, false);
>> -    LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints,
>> unassigned3, 0, false, false);
>> -    LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints,
>> unassigned4, 0, false, true);
>> -    assert unassigned5.size() == 0 : String.format("All readEntries
>> should be assigned by now, but some are still unassigned");
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) /
>> 1E6);
>> +    scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true);
>> +    watch.stop();
>> +    logger.debug("Took {} ms to apply assignments",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>> +    Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries
>> should be assigned by now, but some are still unassigned");
>> +    Preconditions.checkArgument(!rowGroupInfos.isEmpty());
>>    }
>>
>> -  private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint>
>> endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean
>> mustContain, boolean assignAll) {
>> -    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
>> -    LinkedList<RowGroupInfo> unassigned = new LinkedList<>();
>> -
>> -    int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() *
>> 1.5);
>> +  public int fragmentPointer = 0;
>> +
>> +  /**
>> +   *
>> +   * @param endpointAssignments the mapping between fragment/endpoint
>> and rowGroup
>> +   * @param endpoints the list of drillbits, ordered by the
>> corresponding fragment
>> +   * @param rowGroups the list of rowGroups to assign
>> +   * @param requiredPercentage the percentage of max bytes required to
>> make an assignment
>> +   * @param assignAll if true, will assign even if no affinity
>> +   */
>> +  private void scanAndAssign (Multimap<Integer,
>> ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments,
>> List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double
>> requiredPercentage, boolean assignAll) {
>> +    Collections.sort(rowGroups, new ParquetReadEntryComparator());
>> +    final boolean requireAffinity = requiredPercentage > 0;
>> +    int maxAssignments = (int) (rowGroups.size() / endpoints.size());
>> +
>> +    if (maxAssignments < 1) maxAssignments = 1;
>> +
>> +    for(Iterator<RowGroupInfo> iter = rowGroups.iterator();
>> iter.hasNext();){
>> +      RowGroupInfo rowGroupInfo = iter.next();
>> +      for (int i = 0; i < endpoints.size(); i++) {
>> +        int minorFragmentId = (fragmentPointer + i) % endpoints.size();
>> +        DrillbitEndpoint currentEndpoint =
>> endpoints.get(minorFragmentId);
>> +        Map<DrillbitEndpoint, Long> bytesPerEndpoint =
>> rowGroupInfo.getEndpointBytes();
>> +        boolean haveAffinity =
>> bytesPerEndpoint.containsKey(currentEndpoint) ;
>>
>> -    if (maxEntries < 1) maxEntries = 1;
>> -
>> -    int i =0;
>> -    for(RowGroupInfo e : rowGroups) {
>> -      boolean assigned = false;
>> -      for (int j = i; j < i + endpoints.size(); j++) {
>> -        DrillbitEndpoint currentEndpoint =
>> endpoints.get(j%endpoints.size());
>>          if (assignAll ||
>> -                (e.getEndpointBytes().size() > 0 &&
>> -                (e.getEndpointBytes().containsKey(currentEndpoint) ||
>> !mustContain) &&
>> -                (mappings[j%endpoints.size()] == null ||
>> mappings[j%endpoints.size()].size() < maxEntries) &&
>> -                e.getEndpointBytes().get(currentEndpoint) >=
>> e.getMaxBytes() * requiredPercentage / 100)) {
>> -          LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries =
>> mappings[j%endpoints.size()];
>> -          if(entries == null){
>> -            entries = new
>> LinkedList<ParquetRowGroupScan.RowGroupReadEntry>();
>> -            mappings[j%endpoints.size()] = entries;
>> -          }
>> -          entries.add(e.getRowGroupReadEntry());
>> -          logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}",
>> e.getPath(), e.getStart(), currentEndpoint.getAddress());
>> -          assigned = true;
>> +                (!bytesPerEndpoint.isEmpty() &&
>> +                        (!requireAffinity || haveAffinity) &&
>> +
>>  (!endpointAssignments.containsKey(minorFragmentId) ||
>> endpointAssignments.get(minorFragmentId).size() < maxAssignments) &&
>> +                        bytesPerEndpoint.get(currentEndpoint) >=
>> rowGroupInfo.getMaxBytes() * requiredPercentage)) {
>> +
>> +          endpointAssignments.put(minorFragmentId,
>> rowGroupInfo.getRowGroupReadEntry());
>> +          logger.debug("Assigned rowGroup {} to minorFragmentId {}
>> endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId,
>> endpoints.get(minorFragmentId).getAddress());
>> +          iter.remove();
>> +          fragmentPointer = (minorFragmentId + 1) % endpoints.size();
>>            break;
>>          }
>>        }
>> -      if (!assigned) unassigned.add(e);
>> -      i++;
>> +
>>      }
>> -    return unassigned;
>>    }
>>
>>    @Override
>>    public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
>> -    assert minorFragmentId < mappings.length : String.format("Mappings
>> length [%d] should be longer than minor fragment id [%d] but it isn't.",
>> mappings.length, minorFragmentId);
>> -    for (ParquetRowGroupScan.RowGroupReadEntry rg :
>> mappings[minorFragmentId]) {
>> +    assert minorFragmentId < mappings.size() : String.format("Mappings
>> length [%d] should be longer than minor fragment id [%d] but it isn't.",
>> mappings.size(), minorFragmentId);
>> +    for (ParquetRowGroupScan.RowGroupReadEntry rg :
>> mappings.get(minorFragmentId)) {
>>        logger.debug("minorFragmentId: {} Path: {} RowGroupIndex:
>> {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
>>      }
>> +
>>  Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(),
>> String.format("MinorFragmentId %d has no read entries assigned",
>> minorFragmentId));
>>      try {
>> -      return new ParquetRowGroupScan(storageEngine, engineConfig,
>> mappings[minorFragmentId]);
>> +      return new ParquetRowGroupScan(storageEngine, engineConfig,
>> mappings.get(minorFragmentId));
>>      } catch (SetupException e) {
>> -      e.printStackTrace(); // TODO - fix this
>> +      throw new RuntimeException("Error setting up ParquetRowGroupScan",
>> e);
>>      }
>> -    return null;
>>    }
>>
>>    @Override
>> @@ -342,7 +336,8 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>>
>>    @Override
>>    public OperatorCost getCost() {
>> -    return new OperatorCost(1,1,1,1);
>> +    //TODO Figure out how to properly calculate cost
>> +    return new OperatorCost(1,rowGroupInfos.size(),1,1);
>>    }
>>
>>    @Override
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> index 4e46034..3aaa987 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> @@ -211,8 +211,8 @@ public class ParquetRecordReader implements
>> RecordReader {
>>        }
>>        for (VarLenBinaryReader.VarLengthColumn r :
>> varLengthReader.columns) {
>>          output.addField(r.valueVecHolder.getValueVector());
>> -        output.setNewSchema();
>>        }
>> +      output.setNewSchema();
>>      }catch(SchemaChangeException e) {
>>        throw new ExecutionSetupException("Error setting up output
>> mutator.", e);
>>      }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> index 03fb4ec..addd288 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> @@ -21,7 +21,9 @@ import java.io.IOException;
>>  import java.text.SimpleDateFormat;
>>  import java.util.Date;
>>  import java.util.List;
>> +import java.util.concurrent.TimeUnit;
>>
>> +import com.google.common.base.Stopwatch;
>>  import org.apache.drill.common.exceptions.ExecutionSetupException;
>>  import org.apache.drill.exec.ops.FragmentContext;
>>  import org.apache.drill.exec.physical.impl.BatchCreator;
>> @@ -40,12 +42,12 @@ import parquet.hadoop.ParquetFileReader;
>>  import parquet.hadoop.metadata.ParquetMetadata;
>>
>>  public class ParquetScanBatchCreator implements
>> BatchCreator<ParquetRowGroupScan>{
>> -  static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
>> +  static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
>>
>>    @Override
>>    public RecordBatch getBatch(FragmentContext context,
>> ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws
>> ExecutionSetupException {
>> -    long tA = System.nanoTime(), tB;
>> -    System.out.println( new SimpleDateFormat("mm:ss S").format(new
>> Date()) + " :Start of ScanBatCreator.scanBatch");
>> +    Stopwatch watch = new Stopwatch();
>> +    watch.start();
>>      Preconditions.checkArgument(children.isEmpty());
>>      List<RecordReader> readers = Lists.newArrayList();
>>      for(ParquetRowGroupScan.RowGroupReadEntry e :
>> rowGroupScan.getRowGroupReadEntries()){
>> @@ -68,7 +70,7 @@ public class ParquetScanBatchCreator implements
>> BatchCreator<ParquetRowGroupScan
>>          throw new ExecutionSetupException(e1);
>>        }
>>      }
>> -    System.out.println( "Total time in method: " + ((float)
>> (System.nanoTime() - tA) / 1e9));
>> +    logger.debug("total time in ScanBatchCreator.getBatch: {} ms",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>      return new ScanBatch(context, readers.iterator());
>>    }
>>  }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> index 9a33109..72c5f34 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> @@ -35,8 +35,8 @@ public class ErrorHelper {
>>      if(message != null){
>>        sb.append(message);
>>      }
>> -
>> -    do{
>> +
>> +    while (true) {
>>        sb.append(" < ");
>>        sb.append(t.getClass().getSimpleName());
>>        if(t.getMessage() != null){
>> @@ -44,7 +44,9 @@ public class ErrorHelper {
>>          sb.append(t.getMessage());
>>          sb.append(" ]");
>>        }
>> -    }while(t.getCause() != null && t.getCause() != t);
>> +      if (t.getCause() == null || t.getCause() == t) break;
>> +      t = t.getCause();
>> +    }
>>
>>      builder.setMessage(sb.toString());
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> index e2a00f1..18ac294 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> @@ -13,10 +13,16 @@ import org.apache.drill.exec.physical.PhysicalPlan;
>>  import org.apache.drill.exec.planner.PhysicalPlanReader;
>>  import org.apache.drill.exec.proto.CoordinationProtos;
>>  import org.apache.drill.exec.proto.UserProtos;
>> +import org.apache.drill.exec.record.RecordBatchLoader;
>> +import org.apache.drill.exec.record.VectorWrapper;
>> +import org.apache.drill.exec.rpc.RpcException;
>>  import org.apache.drill.exec.rpc.user.QueryResultBatch;
>> +import org.apache.drill.exec.rpc.user.UserResultsListener;
>> +import org.apache.drill.exec.server.BootStrapContext;
>>  import org.apache.drill.exec.server.Drillbit;
>>  import org.apache.drill.exec.server.RemoteServiceSet;
>>  import org.apache.drill.exec.store.parquet.ParquetGroupScan;
>> +import org.apache.drill.exec.vector.ValueVector;
>>  import org.apache.hadoop.fs.BlockLocation;
>>  import org.apache.hadoop.fs.FileStatus;
>>  import org.apache.hadoop.fs.FileSystem;
>> @@ -29,6 +35,7 @@ import java.io.IOException;
>>  import java.nio.charset.Charset;
>>  import java.util.LinkedList;
>>  import java.util.List;
>> +import java.util.concurrent.CountDownLatch;
>>
>>  import static junit.framework.Assert.assertNull;
>>  import static org.junit.Assert.assertEquals;
>> @@ -38,6 +45,8 @@ public class TestParquetPhysicalPlan {
>>
>>    //public String fileName = "/physical_test2.json";
>>    public String fileName = "parquet_scan_union_screen_physical.json";
>> +//  public String fileName = "parquet-sample.json";
>> +
>>
>>    @Test
>>    @Ignore
>> @@ -49,7 +58,51 @@ public class TestParquetPhysicalPlan {
>>        bit1.run();
>>        client.connect();
>>        List<QueryResultBatch> results =
>> client.runQuery(UserProtos.QueryType.PHYSICAL,
>> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
>> -      System.out.println(String.format("Got %d results",
>> results.size()));
>> +      RecordBatchLoader loader = new
>> RecordBatchLoader(bit1.getContext().getAllocator());
>> +      for (QueryResultBatch b : results) {
>> +        System.out.println(String.format("Got %d results",
>> b.getHeader().getRowCount()));
>> +        loader.load(b.getHeader().getDef(), b.getData());
>> +        for (VectorWrapper vw : loader) {
>> +          System.out.println(vw.getValueVector().getField().getName());
>> +          ValueVector vv = vw.getValueVector();
>> +          for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
>> +            Object o = vv.getAccessor().getObject(i);
>> +            System.out.println(vv.getAccessor().getObject(i));
>> +          }
>> +        }
>> +      }
>> +      client.close();
>> +    }
>> +  }
>> +
>> +  private class ParquetResultsListener implements UserResultsListener {
>> +    private CountDownLatch latch = new CountDownLatch(1);
>> +    @Override
>> +    public void submissionFailed(RpcException ex) {
>> +      logger.error("submission failed", ex);
>> +      latch.countDown();
>> +    }
>> +
>> +    @Override
>> +    public void resultArrived(QueryResultBatch result) {
>> +      System.out.printf("Result batch arrived. Number of records: %d",
>> result.getHeader().getRowCount());
>> +      if (result.getHeader().getIsLastChunk()) latch.countDown();
>> +    }
>> +
>> +    public void await() throws Exception {
>> +      latch.await();
>> +    }
>> +  }
>> +  @Test
>> +  @Ignore
>> +  public void testParseParquetPhysicalPlanRemote() throws Exception {
>> +    DrillConfig config = DrillConfig.create();
>> +
>> +    try(DrillClient client = new DrillClient(config);){
>> +      client.connect();
>> +      ParquetResultsListener listener = new ParquetResultsListener();
>> +      client.runQuery(UserProtos.QueryType.PHYSICAL,
>> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8),
>> listener);
>> +      listener.await();
>>        client.close();
>>      }
>>    }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> index 1d91455..7a99c3f 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> @@ -48,6 +48,7 @@ import
>> org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo;
>>  import org.apache.drill.exec.vector.BaseDataValueVector;
>>  import org.apache.drill.exec.vector.ValueVector;
>>  import org.junit.BeforeClass;
>> +import org.junit.Ignore;
>>  import org.junit.Test;
>>
>>  import parquet.bytes.BytesInput;
>> @@ -68,6 +69,7 @@ public class ParquetRecordReaderTest {
>>    static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
>>
>>    private boolean VERBOSE_DEBUG = false;
>> +  private boolean checkValues = true;
>>
>>    static final int numberRowGroups = 20;
>>    static final int recordsPerRowGroup = 300000;
>> @@ -100,6 +102,20 @@ public class ParquetRecordReaderTest {
>>      testParquetFullEngineLocalText(planText, fileName, i,
>> numberRowGroups, recordsPerRowGroup);
>>    }
>>
>> +  @Test
>> +  @Ignore
>> +  public void testLocalDistributed() throws Exception {
>> +    String planName = "/parquet/parquet_scan_union_screen_physical.json";
>> +    testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20,
>> 300000);
>> +  }
>> +
>> +  @Test
>> +  @Ignore
>> +  public void testRemoteDistributed() throws Exception {
>> +    String planName = "/parquet/parquet_scan_union_screen_physical.json";
>> +    testParquetFullEngineRemote(planName, fileName, 1, 10, 30000);
>> +  }
>> +
>>
>>    private class ParquetResultListener implements UserResultsListener {
>>      private SettableFuture<Void> future = SettableFuture.create();
>> @@ -155,8 +171,12 @@ public class ParquetRecordReaderTest {
>>            if (VERBOSE_DEBUG){
>>              System.out.print(vv.getAccessor().getObject(j) + ", " + (j %
>> 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
>>            }
>> -          assertField(vv, j, (TypeProtos.MinorType) currentField.type,
>> -              currentField.values[(int) (columnValCounter % 3)],
>> (String) currentField.name + "/");
>> +          if (checkValues) {
>> +            try {
>> +              assertField(vv, j, (TypeProtos.MinorType)
>> currentField.type,
>> +                currentField.values[(int) (columnValCounter % 3)],
>> (String) currentField.name + "/");
>> +            } catch (AssertionError e) { submissionFailed(new
>> RpcException(e)); }
>> +          }
>>            columnValCounter++;
>>          }
>>          if (VERBOSE_DEBUG){
>> @@ -197,7 +217,9 @@ public class ParquetRecordReaderTest {
>>        batchCounter++;
>>        if(result.getHeader().getIsLastChunk()){
>>          for (String s : valuesChecked.keySet()) {
>> +          try {
>>            assertEquals("Record count incorrect for column: " + s,
>> totalRecords, (long) valuesChecked.get(s));
>> +          } catch (AssertionError e) { submissionFailed(new
>> RpcException(e)); }
>>          }
>>
>>          assert valuesChecked.keySet().size() > 0;
>> @@ -222,11 +244,13 @@ public class ParquetRecordReaderTest {
>>
>>      DrillConfig config = DrillConfig.create();
>>
>> +    checkValues = false;
>> +
>>      try(DrillClient client = new DrillClient(config);){
>>        client.connect();
>>        RecordBatchLoader batchLoader = new
>> RecordBatchLoader(client.getAllocator());
>>        ParquetResultListener resultListener = new
>> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups,
>> numberOfTimesRead);
>> -      client.runQuery(UserProtos.QueryType.LOGICAL,
>> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8),
>> resultListener);
>> +      client.runQuery(UserProtos.QueryType.PHYSICAL,
>> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8),
>> resultListener);
>>        resultListener.get();
>>      }
>>
>> @@ -259,6 +283,28 @@ public class ParquetRecordReaderTest {
>>    }
>>
>>
>> +  //use this method to submit physical plan
>> +  public void testParquetFullEngineLocalTextDistributed(String planName,
>> String filename, int numberOfTimesRead /* specified in json plan */, int
>> numberOfRowGroups, int recordsPerRowGroup) throws Exception{
>> +
>> +    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
>> +
>> +    checkValues = false;
>> +
>> +    DrillConfig config = DrillConfig.create();
>> +
>> +    try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient
>> client = new DrillClient(config, serviceSet.getCoordinator());){
>> +      bit1.run();
>> +      client.connect();
>> +      RecordBatchLoader batchLoader = new
>> RecordBatchLoader(client.getAllocator());
>> +      ParquetResultListener resultListener = new
>> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups,
>> numberOfTimesRead);
>> +      Stopwatch watch = new Stopwatch().start();
>> +      client.runQuery(UserProtos.QueryType.PHYSICAL,
>> Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8),
>> resultListener);
>> +      resultListener.get();
>> +      System.out.println(String.format("Took %d ms to run query",
>> watch.elapsed(TimeUnit.MILLISECONDS)));
>> +
>> +    }
>> +
>> +  }
>>
>>    public String pad(String value, int length) {
>>      return pad(value, length, " ");
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> index f508d09..5efecaf 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> +++
>> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> @@ -11,10 +11,7 @@
>>      @id : 1,
>>      entries : [
>>      {
>> -        path : "/tmp/testParquetFile_many_types_3"
>> -    },
>> -    {
>> -        path : "/tmp/testParquetFile_many_types_3"
>> +        path : "/tmp/parquet_test_file_many_types"
>>      }
>>      ],
>>      storageengine:{
>>
>>
>
>
> --
> Regards,
> Tanujit
>



-- 
Regards,
Tanujit

Re: [5/8] git commit: DRILL-176: Updates to affinity calculator, fixes for parquet serialization. Fix to ErrorHelper looping

Posted by Tanujit Ghosh <ta...@gmail.com>.
Hi,

when i try mvn install after these changes the
org.apache.drill.exec.store.parquet.ParquetRecordReaderTest is hanging.

Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.596 sec -
in org.apache.drill.exec.expr.ExpressionTest
Running org.apache.drill.exec.store.TestAffinityCalculator
Took 0.616287 ms to build range map
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.137 sec -
in org.apache.drill.exec.store.TestAffinityCalculator
Running org.apache.drill.exec.store.parquet.ParquetRecordReaderTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.

Environment is Fedora 18, open jdk 1.7

with skipTests everything is getting compiled fine.

Regards
Tanujit



On Fri, Aug 23, 2013 at 5:36 AM, <ja...@apache.org> wrote:

> DRILL-176:  Updates to affinity calculator, fixes for parquet
> serialization.  Fix to ErrorHelper looping
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/7edd3617
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/7edd3617
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/7edd3617
>
> Branch: refs/heads/master
> Commit: 7edd36170a9be291a69e44f6090474193485bf14
> Parents: d6ae53e
> Author: Steven Phillips <sp...@maprtech.com>
> Authored: Thu Aug 22 16:18:55 2013 -0700
> Committer: Jacques Nadeau <ja...@apache.org>
> Committed: Thu Aug 22 16:18:55 2013 -0700
>
> ----------------------------------------------------------------------
>  .../drill/exec/planner/fragment/Wrapper.java    |   5 +-
>  .../drill/exec/store/AffinityCalculator.java    |  91 ++++++----
>  .../exec/store/parquet/ParquetGroupScan.java    | 177 +++++++++----------
>  .../exec/store/parquet/ParquetRecordReader.java |   2 +-
>  .../store/parquet/ParquetScanBatchCreator.java  |  10 +-
>  .../drill/exec/work/foreman/ErrorHelper.java    |   8 +-
>  .../exec/store/TestParquetPhysicalPlan.java     |  55 +++++-
>  .../store/parquet/ParquetRecordReaderTest.java  |  52 +++++-
>  .../parquet_scan_union_screen_physical.json     |   5 +-
>  9 files changed, 257 insertions(+), 148 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> index d5a24b0..8c4b0b4 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> @@ -151,15 +151,12 @@ public class Wrapper {
>        for (int i = start; i < start + width; i++) {
>          endpoints.add(all.get(i % div));
>        }
> -    } else if (values.size() < width) {
> -      throw new NotImplementedException(
> -          "Haven't implemented a scenario where we have some node
> affinity but the affinity list is smaller than the expected width.");
>      } else {
>        // get nodes with highest affinity.
>        Collections.sort(values);
>        values = Lists.reverse(values);
>        for (int i = 0; i < width; i++) {
> -        endpoints.add(values.get(i).getEndpoint());
> +        endpoints.add(values.get(i%values.size()).getEndpoint());
>        }
>      }
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> index b4092cc..b341ea4 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> @@ -1,6 +1,7 @@
>  package org.apache.drill.exec.store;
>
>
> +import com.google.common.base.Stopwatch;
>  import com.google.common.collect.ImmutableRangeMap;
>  import com.google.common.collect.Range;
>  import org.apache.drill.exec.store.parquet.ParquetGroupScan;
> @@ -13,6 +14,7 @@ import org.apache.hadoop.fs.Path;
>
>  import java.io.IOException;
>  import java.util.*;
> +import java.util.concurrent.TimeUnit;
>
>  public class AffinityCalculator {
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
> @@ -24,6 +26,7 @@ public class AffinityCalculator {
>    String fileName;
>    Collection<DrillbitEndpoint> endpoints;
>    HashMap<String,DrillbitEndpoint> endPointMap;
> +  Stopwatch watch = new Stopwatch();
>
>    public AffinityCalculator(String fileName, FileSystem fs,
> Collection<DrillbitEndpoint> endpoints) {
>      this.fs = fs;
> @@ -33,16 +36,20 @@ public class AffinityCalculator {
>      buildEndpointMap();
>    }
>
> +  /**
> +   * Builds a mapping of block locations to file byte range
> +   */
>    private void buildBlockMap() {
>      try {
> +      watch.start();
>        FileStatus file = fs.getFileStatus(new Path(fileName));
> -      long tC = System.nanoTime();
>        blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
> -      long tD = System.nanoTime();
> +      watch.stop();
>        logger.debug("Block locations: {}", blocks);
> -      logger.debug("Took {} ms to get Block locations", (float)(tD - tC)
> / 1e6);
> +      logger.debug("Took {} ms to get Block locations",
> watch.elapsed(TimeUnit.MILLISECONDS));
>      } catch (IOException ioe) { throw new RuntimeException(ioe); }
> -    long tA = System.nanoTime();
> +    watch.reset();
> +    watch.start();
>      ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new
> ImmutableRangeMap.Builder<Long,BlockLocation>();
>      for (BlockLocation block : blocks) {
>        long start = block.getOffset();
> @@ -51,62 +58,72 @@ public class AffinityCalculator {
>        blockMapBuilder = blockMapBuilder.put(range, block);
>      }
>      blockMap = blockMapBuilder.build();
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to build block map", (float)(tB - tA) / 1e6);
> +    watch.stop();
> +    logger.debug("Took {} ms to build block map",
> watch.elapsed(TimeUnit.MILLISECONDS));
>    }
>    /**
> +   * For a given RowGroup, calculate how many bytes are available on each
> on drillbit endpoint
>     *
> -   * @param entry
> +   * @param rowGroup the RowGroup to calculate endpoint bytes for
>     */
> -  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) {
> -    long tA = System.nanoTime();
> +  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) {
> +    watch.reset();
> +    watch.start();
>      HashMap<String,Long> hostMap = new HashMap<>();
> -    long start = entry.getStart();
> -    long end = start + entry.getLength();
> -    Range<Long> entryRange = Range.closedOpen(start, end);
> -    ImmutableRangeMap<Long,BlockLocation> subRangeMap =
> blockMap.subRangeMap(entryRange);
> -    for (Map.Entry<Range<Long>,BlockLocation> e :
> subRangeMap.asMapOfRanges().entrySet()) {
> -      String[] hosts = null;
> -      Range<Long> blockRange = e.getKey();
> +    HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap();
> +    long start = rowGroup.getStart();
> +    long end = start + rowGroup.getLength();
> +    Range<Long> rowGroupRange = Range.closedOpen(start, end);
> +
> +    // Find submap of ranges that intersect with the rowGroup
> +    ImmutableRangeMap<Long,BlockLocation> subRangeMap =
> blockMap.subRangeMap(rowGroupRange);
> +
> +    // Iterate through each block in this submap and get the host for the
> block location
> +    for (Map.Entry<Range<Long>,BlockLocation> block :
> subRangeMap.asMapOfRanges().entrySet()) {
> +      String[] hosts;
> +      Range<Long> blockRange = block.getKey();
>        try {
> -        hosts = e.getValue().getHosts();
> -      } catch (IOException ioe) { /*TODO Handle this exception */}
> -      Range<Long> intersection = entryRange.intersection(blockRange);
> +        hosts = block.getValue().getHosts();
> +      } catch (IOException ioe) {
> +        throw new RuntimeException("Failed to get hosts for block
> location", ioe);
> +      }
> +      Range<Long> intersection = rowGroupRange.intersection(blockRange);
>        long bytes = intersection.upperEndpoint() -
> intersection.lowerEndpoint();
> +
> +      // For each host in the current block location, add the
> intersecting bytes to the corresponding endpoint
>        for (String host : hosts) {
> -        if (hostMap.containsKey(host)) {
> -          hostMap.put(host, hostMap.get(host) + bytes);
> +        DrillbitEndpoint endpoint = getDrillBitEndpoint(host);
> +        if (endpointByteMap.containsKey(endpoint)) {
> +          endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) +
> bytes);
>          } else {
> -          hostMap.put(host, bytes);
> +          if (endpoint != null ) endpointByteMap.put(endpoint, bytes);
>          }
>        }
>      }
> -    HashMap<DrillbitEndpoint,Long> ebs = new HashMap();
> -    try {
> -      for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) {
> -        String host = hostEntry.getKey();
> -        Long bytes = hostEntry.getValue();
> -        DrillbitEndpoint d = getDrillBitEndpoint(host);
> -        if (d != null ) ebs.put(d, bytes);
> -      }
> -    } catch (NullPointerException n) {}
> -    entry.setEndpointBytes(ebs);
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) /
> 1e6);
> +
> +    rowGroup.setEndpointBytes(endpointByteMap);
> +    rowGroup.setMaxBytes(endpointByteMap.size() > 0 ?
> Collections.max(endpointByteMap.values()) : 0);
> +    logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(),
> rowGroup.getStart(), rowGroup.getMaxBytes());
> +    watch.stop();
> +    logger.debug("Took {} ms to set endpoint bytes",
> watch.elapsed(TimeUnit.MILLISECONDS));
>    }
>
>    private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
>      return endPointMap.get(hostName);
>    }
>
> +  /**
> +   * Builds a mapping of drillbit endpoints to hostnames
> +   */
>    private void buildEndpointMap() {
> -    long tA = System.nanoTime();
> +    watch.reset();
> +    watch.start();
>      endPointMap = new HashMap<String, DrillbitEndpoint>();
>      for (DrillbitEndpoint d : endpoints) {
>        String hostName = d.getAddress();
>        endPointMap.put(hostName, d);
>      }
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) /
> 1e6);
> +    watch.stop();
> +    logger.debug("Took {} ms to build endpoint map",
> watch.elapsed(TimeUnit.MILLISECONDS));
>    }
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> index 9e48d33..64ced87 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> @@ -18,14 +18,13 @@
>  package org.apache.drill.exec.store.parquet;
>
>  import java.io.IOException;
> -import java.util.ArrayList;
> -import java.util.Collection;
> -import java.util.Collections;
> -import java.util.Comparator;
> -import java.util.HashMap;
> -import java.util.LinkedList;
> -import java.util.List;
> +import java.util.*;
> +import java.util.concurrent.TimeUnit;
>
> +import com.google.common.base.Stopwatch;
> +import com.google.common.collect.ArrayListMultimap;
> +import com.google.common.collect.Lists;
> +import com.google.common.collect.Multimap;
>  import org.apache.drill.common.config.DrillConfig;
>  import org.apache.drill.exec.exception.SetupException;
>  import org.apache.drill.exec.physical.EndpointAffinity;
> @@ -59,8 +58,9 @@ import com.google.common.base.Preconditions;
>  public class ParquetGroupScan extends AbstractGroupScan {
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
>
> -  private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings;
> +  private ArrayListMultimap<Integer,
> ParquetRowGroupScan.RowGroupReadEntry> mappings;
>    private List<RowGroupInfo> rowGroupInfos;
> +  private Stopwatch watch = new Stopwatch();
>
>    public List<ReadEntryWithPath> getEntries() {
>      return entries;
> @@ -110,16 +110,14 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
>    }
>
>    private void readFooter() throws IOException {
> -    long tA = System.nanoTime();
> +    watch.reset();
> +    watch.start();
>      rowGroupInfos = new ArrayList();
>      long start = 0, length = 0;
>      ColumnChunkMetaData columnChunkMetaData;
>      for (ReadEntryWithPath readEntryWithPath : entries){
>        Path path = new Path(readEntryWithPath.getPath());
>        ParquetMetadata footer =
> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
> -//      FileSystem fs =
> FileSystem.get(this.storageEngine.getHadoopConfig());
> -//      FileStatus status = fs.getFileStatus(path);
> -//      ParquetMetadata footer =
> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status);
>        readEntryWithPath.getPath();
>
>        int i = 0;
> @@ -138,38 +136,21 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
>          i++;
>        }
>      }
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to get row group infos", (float)(tB - tA) /
> 1E6);
> +    watch.stop();
> +    logger.debug("Took {} ms to get row group infos",
> watch.elapsed(TimeUnit.MILLISECONDS));
>    }
>
>    private void calculateEndpointBytes() {
> -    long tA = System.nanoTime();
> +    watch.reset();
> +    watch.start();
>      AffinityCalculator ac = new AffinityCalculator(fileName, fs,
> availableEndpoints);
>      for (RowGroupInfo e : rowGroupInfos) {
>        ac.setEndpointBytes(e);
>        totalBytes += e.getLength();
>      }
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB -
> tA) / 1E6);
> +    watch.stop();
> +    logger.debug("Took {} ms to calculate EndpointBytes",
> watch.elapsed(TimeUnit.MILLISECONDS));
>    }
> -/*
> -  public LinkedList<RowGroupInfo> getRowGroups() {
> -    return rowGroups;
> -  }
> -
> -  public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) {
> -    this.rowGroups = rowGroups;
> -  }
> -
> -  public static class ParquetFileReadEntry {
> -
> -    String path;
> -
> -    public ParquetFileReadEntry(@JsonProperty String path){
> -      this.path = path;
> -    }
> -  }
> -  */
>
>    @JsonIgnore
>    public FileSystem getFileSystem() {
> @@ -232,16 +213,22 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
>      }
>    }
>
> +  /**
> +   *Calculates the affinity each endpoint has for this scan, by adding up
> the affinity each endpoint has for each
> +   * rowGroup
> +   * @return a list of EndpointAffinity objects
> +   */
>    @Override
>    public List<EndpointAffinity> getOperatorAffinity() {
> -    long tA = System.nanoTime();
> +    watch.reset();
> +    watch.start();
>      if (this.endpointAffinities == null) {
>        HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
>        for (RowGroupInfo entry : rowGroupInfos) {
>          for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
>            long bytes = entry.getEndpointBytes().get(d);
>            float affinity = (float)bytes / (float)totalBytes;
> -          logger.error("RowGroup: {} Endpoint: {} Bytes: {}",
> entry.getRowGroupIndex(), d.getAddress(), bytes);
> +          logger.debug("RowGroup: {} Endpoint: {} Bytes: {}",
> entry.getRowGroupIndex(), d.getAddress(), bytes);
>            if (affinities.keySet().contains(d)) {
>              affinities.put(d, affinities.get(d) + affinity);
>            } else {
> @@ -256,83 +243,90 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
>        }
>        this.endpointAffinities = affinityList;
>      }
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to get operator affinity", (float)(tB - tA)
> / 1E6);
> +    watch.stop();
> +    logger.debug("Took {} ms to get operator affinity",
> watch.elapsed(TimeUnit.MILLISECONDS));
>      return this.endpointAffinities;
>    }
>
>
> +  static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01};
>
> -
> +  /**
> +   *
> +   * @param incomingEndpoints
> +   */
>    @Override
> -  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
> -    long tA = System.nanoTime();
> -    Preconditions.checkArgument(endpoints.size() <= rowGroupInfos.size());
> -
> -    int i = 0;
> -    for (DrillbitEndpoint endpoint : endpoints) {
> -      logger.debug("Endpoint index {}, endpoint host: {}", i++,
> endpoint.getAddress());
> +  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
> +    watch.reset();
> +    watch.start();
> +    Preconditions.checkArgument(incomingEndpoints.size() <=
> rowGroupInfos.size());
> +    mappings = ArrayListMultimap.create();
> +    ArrayList rowGroupList = new ArrayList(rowGroupInfos);
> +    List<DrillbitEndpoint> endpointLinkedlist =
> Lists.newLinkedList(incomingEndpoints);
> +    for(double cutoff : ASSIGNMENT_CUTOFFS ){
> +      scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff,
> false);
>      }
> -
> -    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
> -    mappings = new LinkedList[endpoints.size()];
> -    LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints,
> rowGroupInfos, 100, true, false);
> -    LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints,
> unassigned, 50, true, false);
> -    LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints,
> unassigned2, 25, true, false);
> -    LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints,
> unassigned3, 0, false, false);
> -    LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints,
> unassigned4, 0, false, true);
> -    assert unassigned5.size() == 0 : String.format("All readEntries
> should be assigned by now, but some are still unassigned");
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) /
> 1E6);
> +    scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true);
> +    watch.stop();
> +    logger.debug("Took {} ms to apply assignments",
> watch.elapsed(TimeUnit.MILLISECONDS));
> +    Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries
> should be assigned by now, but some are still unassigned");
> +    Preconditions.checkArgument(!rowGroupInfos.isEmpty());
>    }
>
> -  private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint>
> endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean
> mustContain, boolean assignAll) {
> -    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
> -    LinkedList<RowGroupInfo> unassigned = new LinkedList<>();
> -
> -    int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() *
> 1.5);
> +  public int fragmentPointer = 0;
> +
> +  /**
> +   *
> +   * @param endpointAssignments the mapping between fragment/endpoint and
> rowGroup
> +   * @param endpoints the list of drillbits, ordered by the corresponding
> fragment
> +   * @param rowGroups the list of rowGroups to assign
> +   * @param requiredPercentage the percentage of max bytes required to
> make an assignment
> +   * @param assignAll if true, will assign even if no affinity
> +   */
> +  private void scanAndAssign (Multimap<Integer,
> ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments,
> List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double
> requiredPercentage, boolean assignAll) {
> +    Collections.sort(rowGroups, new ParquetReadEntryComparator());
> +    final boolean requireAffinity = requiredPercentage > 0;
> +    int maxAssignments = (int) (rowGroups.size() / endpoints.size());
> +
> +    if (maxAssignments < 1) maxAssignments = 1;
> +
> +    for(Iterator<RowGroupInfo> iter = rowGroups.iterator();
> iter.hasNext();){
> +      RowGroupInfo rowGroupInfo = iter.next();
> +      for (int i = 0; i < endpoints.size(); i++) {
> +        int minorFragmentId = (fragmentPointer + i) % endpoints.size();
> +        DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
> +        Map<DrillbitEndpoint, Long> bytesPerEndpoint =
> rowGroupInfo.getEndpointBytes();
> +        boolean haveAffinity =
> bytesPerEndpoint.containsKey(currentEndpoint) ;
>
> -    if (maxEntries < 1) maxEntries = 1;
> -
> -    int i =0;
> -    for(RowGroupInfo e : rowGroups) {
> -      boolean assigned = false;
> -      for (int j = i; j < i + endpoints.size(); j++) {
> -        DrillbitEndpoint currentEndpoint =
> endpoints.get(j%endpoints.size());
>          if (assignAll ||
> -                (e.getEndpointBytes().size() > 0 &&
> -                (e.getEndpointBytes().containsKey(currentEndpoint) ||
> !mustContain) &&
> -                (mappings[j%endpoints.size()] == null ||
> mappings[j%endpoints.size()].size() < maxEntries) &&
> -                e.getEndpointBytes().get(currentEndpoint) >=
> e.getMaxBytes() * requiredPercentage / 100)) {
> -          LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries =
> mappings[j%endpoints.size()];
> -          if(entries == null){
> -            entries = new
> LinkedList<ParquetRowGroupScan.RowGroupReadEntry>();
> -            mappings[j%endpoints.size()] = entries;
> -          }
> -          entries.add(e.getRowGroupReadEntry());
> -          logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}",
> e.getPath(), e.getStart(), currentEndpoint.getAddress());
> -          assigned = true;
> +                (!bytesPerEndpoint.isEmpty() &&
> +                        (!requireAffinity || haveAffinity) &&
> +
>  (!endpointAssignments.containsKey(minorFragmentId) ||
> endpointAssignments.get(minorFragmentId).size() < maxAssignments) &&
> +                        bytesPerEndpoint.get(currentEndpoint) >=
> rowGroupInfo.getMaxBytes() * requiredPercentage)) {
> +
> +          endpointAssignments.put(minorFragmentId,
> rowGroupInfo.getRowGroupReadEntry());
> +          logger.debug("Assigned rowGroup {} to minorFragmentId {}
> endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId,
> endpoints.get(minorFragmentId).getAddress());
> +          iter.remove();
> +          fragmentPointer = (minorFragmentId + 1) % endpoints.size();
>            break;
>          }
>        }
> -      if (!assigned) unassigned.add(e);
> -      i++;
> +
>      }
> -    return unassigned;
>    }
>
>    @Override
>    public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
> -    assert minorFragmentId < mappings.length : String.format("Mappings
> length [%d] should be longer than minor fragment id [%d] but it isn't.",
> mappings.length, minorFragmentId);
> -    for (ParquetRowGroupScan.RowGroupReadEntry rg :
> mappings[minorFragmentId]) {
> +    assert minorFragmentId < mappings.size() : String.format("Mappings
> length [%d] should be longer than minor fragment id [%d] but it isn't.",
> mappings.size(), minorFragmentId);
> +    for (ParquetRowGroupScan.RowGroupReadEntry rg :
> mappings.get(minorFragmentId)) {
>        logger.debug("minorFragmentId: {} Path: {} RowGroupIndex:
> {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
>      }
> +    Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(),
> String.format("MinorFragmentId %d has no read entries assigned",
> minorFragmentId));
>      try {
> -      return new ParquetRowGroupScan(storageEngine, engineConfig,
> mappings[minorFragmentId]);
> +      return new ParquetRowGroupScan(storageEngine, engineConfig,
> mappings.get(minorFragmentId));
>      } catch (SetupException e) {
> -      e.printStackTrace(); // TODO - fix this
> +      throw new RuntimeException("Error setting up ParquetRowGroupScan",
> e);
>      }
> -    return null;
>    }
>
>    @Override
> @@ -342,7 +336,8 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
>
>    @Override
>    public OperatorCost getCost() {
> -    return new OperatorCost(1,1,1,1);
> +    //TODO Figure out how to properly calculate cost
> +    return new OperatorCost(1,rowGroupInfos.size(),1,1);
>    }
>
>    @Override
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> index 4e46034..3aaa987 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> @@ -211,8 +211,8 @@ public class ParquetRecordReader implements
> RecordReader {
>        }
>        for (VarLenBinaryReader.VarLengthColumn r :
> varLengthReader.columns) {
>          output.addField(r.valueVecHolder.getValueVector());
> -        output.setNewSchema();
>        }
> +      output.setNewSchema();
>      }catch(SchemaChangeException e) {
>        throw new ExecutionSetupException("Error setting up output
> mutator.", e);
>      }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> index 03fb4ec..addd288 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> @@ -21,7 +21,9 @@ import java.io.IOException;
>  import java.text.SimpleDateFormat;
>  import java.util.Date;
>  import java.util.List;
> +import java.util.concurrent.TimeUnit;
>
> +import com.google.common.base.Stopwatch;
>  import org.apache.drill.common.exceptions.ExecutionSetupException;
>  import org.apache.drill.exec.ops.FragmentContext;
>  import org.apache.drill.exec.physical.impl.BatchCreator;
> @@ -40,12 +42,12 @@ import parquet.hadoop.ParquetFileReader;
>  import parquet.hadoop.metadata.ParquetMetadata;
>
>  public class ParquetScanBatchCreator implements
> BatchCreator<ParquetRowGroupScan>{
> -  static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
> +  static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
>
>    @Override
>    public RecordBatch getBatch(FragmentContext context,
> ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws
> ExecutionSetupException {
> -    long tA = System.nanoTime(), tB;
> -    System.out.println( new SimpleDateFormat("mm:ss S").format(new
> Date()) + " :Start of ScanBatCreator.scanBatch");
> +    Stopwatch watch = new Stopwatch();
> +    watch.start();
>      Preconditions.checkArgument(children.isEmpty());
>      List<RecordReader> readers = Lists.newArrayList();
>      for(ParquetRowGroupScan.RowGroupReadEntry e :
> rowGroupScan.getRowGroupReadEntries()){
> @@ -68,7 +70,7 @@ public class ParquetScanBatchCreator implements
> BatchCreator<ParquetRowGroupScan
>          throw new ExecutionSetupException(e1);
>        }
>      }
> -    System.out.println( "Total time in method: " + ((float)
> (System.nanoTime() - tA) / 1e9));
> +    logger.debug("total time in ScanBatchCreator.getBatch: {} ms",
> watch.elapsed(TimeUnit.MILLISECONDS));
>      return new ScanBatch(context, readers.iterator());
>    }
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> index 9a33109..72c5f34 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> @@ -35,8 +35,8 @@ public class ErrorHelper {
>      if(message != null){
>        sb.append(message);
>      }
> -
> -    do{
> +
> +    while (true) {
>        sb.append(" < ");
>        sb.append(t.getClass().getSimpleName());
>        if(t.getMessage() != null){
> @@ -44,7 +44,9 @@ public class ErrorHelper {
>          sb.append(t.getMessage());
>          sb.append(" ]");
>        }
> -    }while(t.getCause() != null && t.getCause() != t);
> +      if (t.getCause() == null || t.getCause() == t) break;
> +      t = t.getCause();
> +    }
>
>      builder.setMessage(sb.toString());
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> index e2a00f1..18ac294 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> @@ -13,10 +13,16 @@ import org.apache.drill.exec.physical.PhysicalPlan;
>  import org.apache.drill.exec.planner.PhysicalPlanReader;
>  import org.apache.drill.exec.proto.CoordinationProtos;
>  import org.apache.drill.exec.proto.UserProtos;
> +import org.apache.drill.exec.record.RecordBatchLoader;
> +import org.apache.drill.exec.record.VectorWrapper;
> +import org.apache.drill.exec.rpc.RpcException;
>  import org.apache.drill.exec.rpc.user.QueryResultBatch;
> +import org.apache.drill.exec.rpc.user.UserResultsListener;
> +import org.apache.drill.exec.server.BootStrapContext;
>  import org.apache.drill.exec.server.Drillbit;
>  import org.apache.drill.exec.server.RemoteServiceSet;
>  import org.apache.drill.exec.store.parquet.ParquetGroupScan;
> +import org.apache.drill.exec.vector.ValueVector;
>  import org.apache.hadoop.fs.BlockLocation;
>  import org.apache.hadoop.fs.FileStatus;
>  import org.apache.hadoop.fs.FileSystem;
> @@ -29,6 +35,7 @@ import java.io.IOException;
>  import java.nio.charset.Charset;
>  import java.util.LinkedList;
>  import java.util.List;
> +import java.util.concurrent.CountDownLatch;
>
>  import static junit.framework.Assert.assertNull;
>  import static org.junit.Assert.assertEquals;
> @@ -38,6 +45,8 @@ public class TestParquetPhysicalPlan {
>
>    //public String fileName = "/physical_test2.json";
>    public String fileName = "parquet_scan_union_screen_physical.json";
> +//  public String fileName = "parquet-sample.json";
> +
>
>    @Test
>    @Ignore
> @@ -49,7 +58,51 @@ public class TestParquetPhysicalPlan {
>        bit1.run();
>        client.connect();
>        List<QueryResultBatch> results =
> client.runQuery(UserProtos.QueryType.PHYSICAL,
> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
> -      System.out.println(String.format("Got %d results", results.size()));
> +      RecordBatchLoader loader = new
> RecordBatchLoader(bit1.getContext().getAllocator());
> +      for (QueryResultBatch b : results) {
> +        System.out.println(String.format("Got %d results",
> b.getHeader().getRowCount()));
> +        loader.load(b.getHeader().getDef(), b.getData());
> +        for (VectorWrapper vw : loader) {
> +          System.out.println(vw.getValueVector().getField().getName());
> +          ValueVector vv = vw.getValueVector();
> +          for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
> +            Object o = vv.getAccessor().getObject(i);
> +            System.out.println(vv.getAccessor().getObject(i));
> +          }
> +        }
> +      }
> +      client.close();
> +    }
> +  }
> +
> +  private class ParquetResultsListener implements UserResultsListener {
> +    private CountDownLatch latch = new CountDownLatch(1);
> +    @Override
> +    public void submissionFailed(RpcException ex) {
> +      logger.error("submission failed", ex);
> +      latch.countDown();
> +    }
> +
> +    @Override
> +    public void resultArrived(QueryResultBatch result) {
> +      System.out.printf("Result batch arrived. Number of records: %d",
> result.getHeader().getRowCount());
> +      if (result.getHeader().getIsLastChunk()) latch.countDown();
> +    }
> +
> +    public void await() throws Exception {
> +      latch.await();
> +    }
> +  }
> +  @Test
> +  @Ignore
> +  public void testParseParquetPhysicalPlanRemote() throws Exception {
> +    DrillConfig config = DrillConfig.create();
> +
> +    try(DrillClient client = new DrillClient(config);){
> +      client.connect();
> +      ParquetResultsListener listener = new ParquetResultsListener();
> +      client.runQuery(UserProtos.QueryType.PHYSICAL,
> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8),
> listener);
> +      listener.await();
>        client.close();
>      }
>    }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> index 1d91455..7a99c3f 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> @@ -48,6 +48,7 @@ import
> org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo;
>  import org.apache.drill.exec.vector.BaseDataValueVector;
>  import org.apache.drill.exec.vector.ValueVector;
>  import org.junit.BeforeClass;
> +import org.junit.Ignore;
>  import org.junit.Test;
>
>  import parquet.bytes.BytesInput;
> @@ -68,6 +69,7 @@ public class ParquetRecordReaderTest {
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
>
>    private boolean VERBOSE_DEBUG = false;
> +  private boolean checkValues = true;
>
>    static final int numberRowGroups = 20;
>    static final int recordsPerRowGroup = 300000;
> @@ -100,6 +102,20 @@ public class ParquetRecordReaderTest {
>      testParquetFullEngineLocalText(planText, fileName, i,
> numberRowGroups, recordsPerRowGroup);
>    }
>
> +  @Test
> +  @Ignore
> +  public void testLocalDistributed() throws Exception {
> +    String planName = "/parquet/parquet_scan_union_screen_physical.json";
> +    testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20,
> 300000);
> +  }
> +
> +  @Test
> +  @Ignore
> +  public void testRemoteDistributed() throws Exception {
> +    String planName = "/parquet/parquet_scan_union_screen_physical.json";
> +    testParquetFullEngineRemote(planName, fileName, 1, 10, 30000);
> +  }
> +
>
>    private class ParquetResultListener implements UserResultsListener {
>      private SettableFuture<Void> future = SettableFuture.create();
> @@ -155,8 +171,12 @@ public class ParquetRecordReaderTest {
>            if (VERBOSE_DEBUG){
>              System.out.print(vv.getAccessor().getObject(j) + ", " + (j %
> 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
>            }
> -          assertField(vv, j, (TypeProtos.MinorType) currentField.type,
> -              currentField.values[(int) (columnValCounter % 3)], (String)
> currentField.name + "/");
> +          if (checkValues) {
> +            try {
> +              assertField(vv, j, (TypeProtos.MinorType) currentField.type,
> +                currentField.values[(int) (columnValCounter % 3)],
> (String) currentField.name + "/");
> +            } catch (AssertionError e) { submissionFailed(new
> RpcException(e)); }
> +          }
>            columnValCounter++;
>          }
>          if (VERBOSE_DEBUG){
> @@ -197,7 +217,9 @@ public class ParquetRecordReaderTest {
>        batchCounter++;
>        if(result.getHeader().getIsLastChunk()){
>          for (String s : valuesChecked.keySet()) {
> +          try {
>            assertEquals("Record count incorrect for column: " + s,
> totalRecords, (long) valuesChecked.get(s));
> +          } catch (AssertionError e) { submissionFailed(new
> RpcException(e)); }
>          }
>
>          assert valuesChecked.keySet().size() > 0;
> @@ -222,11 +244,13 @@ public class ParquetRecordReaderTest {
>
>      DrillConfig config = DrillConfig.create();
>
> +    checkValues = false;
> +
>      try(DrillClient client = new DrillClient(config);){
>        client.connect();
>        RecordBatchLoader batchLoader = new
> RecordBatchLoader(client.getAllocator());
>        ParquetResultListener resultListener = new
> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups,
> numberOfTimesRead);
> -      client.runQuery(UserProtos.QueryType.LOGICAL,
> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8),
> resultListener);
> +      client.runQuery(UserProtos.QueryType.PHYSICAL,
> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8),
> resultListener);
>        resultListener.get();
>      }
>
> @@ -259,6 +283,28 @@ public class ParquetRecordReaderTest {
>    }
>
>
> +  //use this method to submit physical plan
> +  public void testParquetFullEngineLocalTextDistributed(String planName,
> String filename, int numberOfTimesRead /* specified in json plan */, int
> numberOfRowGroups, int recordsPerRowGroup) throws Exception{
> +
> +    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
> +
> +    checkValues = false;
> +
> +    DrillConfig config = DrillConfig.create();
> +
> +    try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient
> client = new DrillClient(config, serviceSet.getCoordinator());){
> +      bit1.run();
> +      client.connect();
> +      RecordBatchLoader batchLoader = new
> RecordBatchLoader(client.getAllocator());
> +      ParquetResultListener resultListener = new
> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups,
> numberOfTimesRead);
> +      Stopwatch watch = new Stopwatch().start();
> +      client.runQuery(UserProtos.QueryType.PHYSICAL,
> Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8),
> resultListener);
> +      resultListener.get();
> +      System.out.println(String.format("Took %d ms to run query",
> watch.elapsed(TimeUnit.MILLISECONDS)));
> +
> +    }
> +
> +  }
>
>    public String pad(String value, int length) {
>      return pad(value, length, " ");
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> index f508d09..5efecaf 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> +++
> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> @@ -11,10 +11,7 @@
>      @id : 1,
>      entries : [
>      {
> -        path : "/tmp/testParquetFile_many_types_3"
> -    },
> -    {
> -        path : "/tmp/testParquetFile_many_types_3"
> +        path : "/tmp/parquet_test_file_many_types"
>      }
>      ],
>      storageengine:{
>
>


-- 
Regards,
Tanujit