You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/23 02:06:48 UTC
[5/8] git commit: DRILL-176: Updates to affinity calculator,
fixes for parquet serialization. Fix to ErrorHelper looping
DRILL-176: Updates to affinity calculator, fixes for parquet serialization. Fix to ErrorHelper looping
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/7edd3617
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/7edd3617
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/7edd3617
Branch: refs/heads/master
Commit: 7edd36170a9be291a69e44f6090474193485bf14
Parents: d6ae53e
Author: Steven Phillips <sp...@maprtech.com>
Authored: Thu Aug 22 16:18:55 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 22 16:18:55 2013 -0700
----------------------------------------------------------------------
.../drill/exec/planner/fragment/Wrapper.java | 5 +-
.../drill/exec/store/AffinityCalculator.java | 91 ++++++----
.../exec/store/parquet/ParquetGroupScan.java | 177 +++++++++----------
.../exec/store/parquet/ParquetRecordReader.java | 2 +-
.../store/parquet/ParquetScanBatchCreator.java | 10 +-
.../drill/exec/work/foreman/ErrorHelper.java | 8 +-
.../exec/store/TestParquetPhysicalPlan.java | 55 +++++-
.../store/parquet/ParquetRecordReaderTest.java | 52 +++++-
.../parquet_scan_union_screen_physical.json | 5 +-
9 files changed, 257 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index d5a24b0..8c4b0b4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -151,15 +151,12 @@ public class Wrapper {
for (int i = start; i < start + width; i++) {
endpoints.add(all.get(i % div));
}
- } else if (values.size() < width) {
- throw new NotImplementedException(
- "Haven't implemented a scenario where we have some node affinity but the affinity list is smaller than the expected width.");
} else {
// get nodes with highest affinity.
Collections.sort(values);
values = Lists.reverse(values);
for (int i = 0; i < width; i++) {
- endpoints.add(values.get(i).getEndpoint());
+ endpoints.add(values.get(i%values.size()).getEndpoint());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
index b4092cc..b341ea4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
@@ -1,6 +1,7 @@
package org.apache.drill.exec.store;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableRangeMap;
import com.google.common.collect.Range;
import org.apache.drill.exec.store.parquet.ParquetGroupScan;
@@ -13,6 +14,7 @@ import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.TimeUnit;
public class AffinityCalculator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
@@ -24,6 +26,7 @@ public class AffinityCalculator {
String fileName;
Collection<DrillbitEndpoint> endpoints;
HashMap<String,DrillbitEndpoint> endPointMap;
+ Stopwatch watch = new Stopwatch();
public AffinityCalculator(String fileName, FileSystem fs, Collection<DrillbitEndpoint> endpoints) {
this.fs = fs;
@@ -33,16 +36,20 @@ public class AffinityCalculator {
buildEndpointMap();
}
+ /**
+ * Builds a mapping of block locations to file byte range
+ */
private void buildBlockMap() {
try {
+ watch.start();
FileStatus file = fs.getFileStatus(new Path(fileName));
- long tC = System.nanoTime();
blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
- long tD = System.nanoTime();
+ watch.stop();
logger.debug("Block locations: {}", blocks);
- logger.debug("Took {} ms to get Block locations", (float)(tD - tC) / 1e6);
+ logger.debug("Took {} ms to get Block locations", watch.elapsed(TimeUnit.MILLISECONDS));
} catch (IOException ioe) { throw new RuntimeException(ioe); }
- long tA = System.nanoTime();
+ watch.reset();
+ watch.start();
ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new ImmutableRangeMap.Builder<Long,BlockLocation>();
for (BlockLocation block : blocks) {
long start = block.getOffset();
@@ -51,62 +58,72 @@ public class AffinityCalculator {
blockMapBuilder = blockMapBuilder.put(range, block);
}
blockMap = blockMapBuilder.build();
- long tB = System.nanoTime();
- logger.debug("Took {} ms to build block map", (float)(tB - tA) / 1e6);
+ watch.stop();
+ logger.debug("Took {} ms to build block map", watch.elapsed(TimeUnit.MILLISECONDS));
}
/**
+ * For a given RowGroup, calculate how many bytes are available on each on drillbit endpoint
*
- * @param entry
+ * @param rowGroup the RowGroup to calculate endpoint bytes for
*/
- public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) {
- long tA = System.nanoTime();
+ public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) {
+ watch.reset();
+ watch.start();
HashMap<String,Long> hostMap = new HashMap<>();
- long start = entry.getStart();
- long end = start + entry.getLength();
- Range<Long> entryRange = Range.closedOpen(start, end);
- ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(entryRange);
- for (Map.Entry<Range<Long>,BlockLocation> e : subRangeMap.asMapOfRanges().entrySet()) {
- String[] hosts = null;
- Range<Long> blockRange = e.getKey();
+ HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap();
+ long start = rowGroup.getStart();
+ long end = start + rowGroup.getLength();
+ Range<Long> rowGroupRange = Range.closedOpen(start, end);
+
+ // Find submap of ranges that intersect with the rowGroup
+ ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(rowGroupRange);
+
+ // Iterate through each block in this submap and get the host for the block location
+ for (Map.Entry<Range<Long>,BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) {
+ String[] hosts;
+ Range<Long> blockRange = block.getKey();
try {
- hosts = e.getValue().getHosts();
- } catch (IOException ioe) { /*TODO Handle this exception */}
- Range<Long> intersection = entryRange.intersection(blockRange);
+ hosts = block.getValue().getHosts();
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to get hosts for block location", ioe);
+ }
+ Range<Long> intersection = rowGroupRange.intersection(blockRange);
long bytes = intersection.upperEndpoint() - intersection.lowerEndpoint();
+
+ // For each host in the current block location, add the intersecting bytes to the corresponding endpoint
for (String host : hosts) {
- if (hostMap.containsKey(host)) {
- hostMap.put(host, hostMap.get(host) + bytes);
+ DrillbitEndpoint endpoint = getDrillBitEndpoint(host);
+ if (endpointByteMap.containsKey(endpoint)) {
+ endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) + bytes);
} else {
- hostMap.put(host, bytes);
+ if (endpoint != null ) endpointByteMap.put(endpoint, bytes);
}
}
}
- HashMap<DrillbitEndpoint,Long> ebs = new HashMap();
- try {
- for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) {
- String host = hostEntry.getKey();
- Long bytes = hostEntry.getValue();
- DrillbitEndpoint d = getDrillBitEndpoint(host);
- if (d != null ) ebs.put(d, bytes);
- }
- } catch (NullPointerException n) {}
- entry.setEndpointBytes(ebs);
- long tB = System.nanoTime();
- logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) / 1e6);
+
+ rowGroup.setEndpointBytes(endpointByteMap);
+ rowGroup.setMaxBytes(endpointByteMap.size() > 0 ? Collections.max(endpointByteMap.values()) : 0);
+ logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(), rowGroup.getStart(), rowGroup.getMaxBytes());
+ watch.stop();
+ logger.debug("Took {} ms to set endpoint bytes", watch.elapsed(TimeUnit.MILLISECONDS));
}
private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
return endPointMap.get(hostName);
}
+ /**
+ * Builds a mapping of drillbit endpoints to hostnames
+ */
private void buildEndpointMap() {
- long tA = System.nanoTime();
+ watch.reset();
+ watch.start();
endPointMap = new HashMap<String, DrillbitEndpoint>();
for (DrillbitEndpoint d : endpoints) {
String hostName = d.getAddress();
endPointMap.put(hostName, d);
}
- long tB = System.nanoTime();
- logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) / 1e6);
+ watch.stop();
+ logger.debug("Took {} ms to build endpoint map", watch.elapsed(TimeUnit.MILLISECONDS));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 9e48d33..64ced87 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -18,14 +18,13 @@
package org.apache.drill.exec.store.parquet;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.exception.SetupException;
import org.apache.drill.exec.physical.EndpointAffinity;
@@ -59,8 +58,9 @@ import com.google.common.base.Preconditions;
public class ParquetGroupScan extends AbstractGroupScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
- private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings;
+ private ArrayListMultimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> mappings;
private List<RowGroupInfo> rowGroupInfos;
+ private Stopwatch watch = new Stopwatch();
public List<ReadEntryWithPath> getEntries() {
return entries;
@@ -110,16 +110,14 @@ public class ParquetGroupScan extends AbstractGroupScan {
}
private void readFooter() throws IOException {
- long tA = System.nanoTime();
+ watch.reset();
+ watch.start();
rowGroupInfos = new ArrayList();
long start = 0, length = 0;
ColumnChunkMetaData columnChunkMetaData;
for (ReadEntryWithPath readEntryWithPath : entries){
Path path = new Path(readEntryWithPath.getPath());
ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
-// FileSystem fs = FileSystem.get(this.storageEngine.getHadoopConfig());
-// FileStatus status = fs.getFileStatus(path);
-// ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status);
readEntryWithPath.getPath();
int i = 0;
@@ -138,38 +136,21 @@ public class ParquetGroupScan extends AbstractGroupScan {
i++;
}
}
- long tB = System.nanoTime();
- logger.debug("Took {} ms to get row group infos", (float)(tB - tA) / 1E6);
+ watch.stop();
+ logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS));
}
private void calculateEndpointBytes() {
- long tA = System.nanoTime();
+ watch.reset();
+ watch.start();
AffinityCalculator ac = new AffinityCalculator(fileName, fs, availableEndpoints);
for (RowGroupInfo e : rowGroupInfos) {
ac.setEndpointBytes(e);
totalBytes += e.getLength();
}
- long tB = System.nanoTime();
- logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB - tA) / 1E6);
+ watch.stop();
+ logger.debug("Took {} ms to calculate EndpointBytes", watch.elapsed(TimeUnit.MILLISECONDS));
}
-/*
- public LinkedList<RowGroupInfo> getRowGroups() {
- return rowGroups;
- }
-
- public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) {
- this.rowGroups = rowGroups;
- }
-
- public static class ParquetFileReadEntry {
-
- String path;
-
- public ParquetFileReadEntry(@JsonProperty String path){
- this.path = path;
- }
- }
- */
@JsonIgnore
public FileSystem getFileSystem() {
@@ -232,16 +213,22 @@ public class ParquetGroupScan extends AbstractGroupScan {
}
}
+ /**
+ *Calculates the affinity each endpoint has for this scan, by adding up the affinity each endpoint has for each
+ * rowGroup
+ * @return a list of EndpointAffinity objects
+ */
@Override
public List<EndpointAffinity> getOperatorAffinity() {
- long tA = System.nanoTime();
+ watch.reset();
+ watch.start();
if (this.endpointAffinities == null) {
HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
for (RowGroupInfo entry : rowGroupInfos) {
for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
long bytes = entry.getEndpointBytes().get(d);
float affinity = (float)bytes / (float)totalBytes;
- logger.error("RowGroup: {} Endpoint: {} Bytes: {}", entry.getRowGroupIndex(), d.getAddress(), bytes);
+ logger.debug("RowGroup: {} Endpoint: {} Bytes: {}", entry.getRowGroupIndex(), d.getAddress(), bytes);
if (affinities.keySet().contains(d)) {
affinities.put(d, affinities.get(d) + affinity);
} else {
@@ -256,83 +243,90 @@ public class ParquetGroupScan extends AbstractGroupScan {
}
this.endpointAffinities = affinityList;
}
- long tB = System.nanoTime();
- logger.debug("Took {} ms to get operator affinity", (float)(tB - tA) / 1E6);
+ watch.stop();
+ logger.debug("Took {} ms to get operator affinity", watch.elapsed(TimeUnit.MILLISECONDS));
return this.endpointAffinities;
}
+ static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01};
-
+ /**
+ *
+ * @param incomingEndpoints
+ */
@Override
- public void applyAssignments(List<DrillbitEndpoint> endpoints) {
- long tA = System.nanoTime();
- Preconditions.checkArgument(endpoints.size() <= rowGroupInfos.size());
-
- int i = 0;
- for (DrillbitEndpoint endpoint : endpoints) {
- logger.debug("Endpoint index {}, endpoint host: {}", i++, endpoint.getAddress());
+ public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+ watch.reset();
+ watch.start();
+ Preconditions.checkArgument(incomingEndpoints.size() <= rowGroupInfos.size());
+ mappings = ArrayListMultimap.create();
+ ArrayList rowGroupList = new ArrayList(rowGroupInfos);
+ List<DrillbitEndpoint> endpointLinkedlist = Lists.newLinkedList(incomingEndpoints);
+ for(double cutoff : ASSIGNMENT_CUTOFFS ){
+ scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff, false);
}
-
- Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
- mappings = new LinkedList[endpoints.size()];
- LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints, rowGroupInfos, 100, true, false);
- LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints, unassigned, 50, true, false);
- LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints, unassigned2, 25, true, false);
- LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints, unassigned3, 0, false, false);
- LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints, unassigned4, 0, false, true);
- assert unassigned5.size() == 0 : String.format("All readEntries should be assigned by now, but some are still unassigned");
- long tB = System.nanoTime();
- logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) / 1E6);
+ scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true);
+ watch.stop();
+ logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS));
+ Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned");
+ Preconditions.checkArgument(!rowGroupInfos.isEmpty());
}
- private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean mustContain, boolean assignAll) {
- Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
- LinkedList<RowGroupInfo> unassigned = new LinkedList<>();
-
- int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() * 1.5);
+ public int fragmentPointer = 0;
+
+ /**
+ *
+ * @param endpointAssignments the mapping between fragment/endpoint and rowGroup
+ * @param endpoints the list of drillbits, ordered by the corresponding fragment
+ * @param rowGroups the list of rowGroups to assign
+ * @param requiredPercentage the percentage of max bytes required to make an assignment
+ * @param assignAll if true, will assign even if no affinity
+ */
+ private void scanAndAssign (Multimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments, List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double requiredPercentage, boolean assignAll) {
+ Collections.sort(rowGroups, new ParquetReadEntryComparator());
+ final boolean requireAffinity = requiredPercentage > 0;
+ int maxAssignments = (int) (rowGroups.size() / endpoints.size());
+
+ if (maxAssignments < 1) maxAssignments = 1;
+
+ for(Iterator<RowGroupInfo> iter = rowGroups.iterator(); iter.hasNext();){
+ RowGroupInfo rowGroupInfo = iter.next();
+ for (int i = 0; i < endpoints.size(); i++) {
+ int minorFragmentId = (fragmentPointer + i) % endpoints.size();
+ DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
+ Map<DrillbitEndpoint, Long> bytesPerEndpoint = rowGroupInfo.getEndpointBytes();
+ boolean haveAffinity = bytesPerEndpoint.containsKey(currentEndpoint) ;
- if (maxEntries < 1) maxEntries = 1;
-
- int i =0;
- for(RowGroupInfo e : rowGroups) {
- boolean assigned = false;
- for (int j = i; j < i + endpoints.size(); j++) {
- DrillbitEndpoint currentEndpoint = endpoints.get(j%endpoints.size());
if (assignAll ||
- (e.getEndpointBytes().size() > 0 &&
- (e.getEndpointBytes().containsKey(currentEndpoint) || !mustContain) &&
- (mappings[j%endpoints.size()] == null || mappings[j%endpoints.size()].size() < maxEntries) &&
- e.getEndpointBytes().get(currentEndpoint) >= e.getMaxBytes() * requiredPercentage / 100)) {
- LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries = mappings[j%endpoints.size()];
- if(entries == null){
- entries = new LinkedList<ParquetRowGroupScan.RowGroupReadEntry>();
- mappings[j%endpoints.size()] = entries;
- }
- entries.add(e.getRowGroupReadEntry());
- logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}", e.getPath(), e.getStart(), currentEndpoint.getAddress());
- assigned = true;
+ (!bytesPerEndpoint.isEmpty() &&
+ (!requireAffinity || haveAffinity) &&
+ (!endpointAssignments.containsKey(minorFragmentId) || endpointAssignments.get(minorFragmentId).size() < maxAssignments) &&
+ bytesPerEndpoint.get(currentEndpoint) >= rowGroupInfo.getMaxBytes() * requiredPercentage)) {
+
+ endpointAssignments.put(minorFragmentId, rowGroupInfo.getRowGroupReadEntry());
+ logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId, endpoints.get(minorFragmentId).getAddress());
+ iter.remove();
+ fragmentPointer = (minorFragmentId + 1) % endpoints.size();
break;
}
}
- if (!assigned) unassigned.add(e);
- i++;
+
}
- return unassigned;
}
@Override
public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
- assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
- for (ParquetRowGroupScan.RowGroupReadEntry rg : mappings[minorFragmentId]) {
+ assert minorFragmentId < mappings.size() : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(), minorFragmentId);
+ for (ParquetRowGroupScan.RowGroupReadEntry rg : mappings.get(minorFragmentId)) {
logger.debug("minorFragmentId: {} Path: {} RowGroupIndex: {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
}
+ Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(), String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
try {
- return new ParquetRowGroupScan(storageEngine, engineConfig, mappings[minorFragmentId]);
+ return new ParquetRowGroupScan(storageEngine, engineConfig, mappings.get(minorFragmentId));
} catch (SetupException e) {
- e.printStackTrace(); // TODO - fix this
+ throw new RuntimeException("Error setting up ParquetRowGroupScan", e);
}
- return null;
}
@Override
@@ -342,7 +336,8 @@ public class ParquetGroupScan extends AbstractGroupScan {
@Override
public OperatorCost getCost() {
- return new OperatorCost(1,1,1,1);
+ //TODO Figure out how to properly calculate cost
+ return new OperatorCost(1,rowGroupInfos.size(),1,1);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 4e46034..3aaa987 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -211,8 +211,8 @@ public class ParquetRecordReader implements RecordReader {
}
for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns) {
output.addField(r.valueVecHolder.getValueVector());
- output.setNewSchema();
}
+ output.setNewSchema();
}catch(SchemaChangeException e) {
throw new ExecutionSetupException("Error setting up output mutator.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 03fb4ec..addd288 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import com.google.common.base.Stopwatch;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -40,12 +42,12 @@ import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
@Override
public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
- long tA = System.nanoTime(), tB;
- System.out.println( new SimpleDateFormat("mm:ss S").format(new Date()) + " :Start of ScanBatCreator.scanBatch");
+ Stopwatch watch = new Stopwatch();
+ watch.start();
Preconditions.checkArgument(children.isEmpty());
List<RecordReader> readers = Lists.newArrayList();
for(ParquetRowGroupScan.RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
@@ -68,7 +70,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
throw new ExecutionSetupException(e1);
}
}
- System.out.println( "Total time in method: " + ((float) (System.nanoTime() - tA) / 1e9));
+ logger.debug("total time in ScanBatchCreator.getBatch: {} ms", watch.elapsed(TimeUnit.MILLISECONDS));
return new ScanBatch(context, readers.iterator());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
index 9a33109..72c5f34 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
@@ -35,8 +35,8 @@ public class ErrorHelper {
if(message != null){
sb.append(message);
}
-
- do{
+
+ while (true) {
sb.append(" < ");
sb.append(t.getClass().getSimpleName());
if(t.getMessage() != null){
@@ -44,7 +44,9 @@ public class ErrorHelper {
sb.append(t.getMessage());
sb.append(" ]");
}
- }while(t.getCause() != null && t.getCause() != t);
+ if (t.getCause() == null || t.getCause() == t) break;
+ t = t.getCause();
+ }
builder.setMessage(sb.toString());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
index e2a00f1..18ac294 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
@@ -13,10 +13,16 @@ import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -29,6 +35,7 @@ import java.io.IOException;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import static junit.framework.Assert.assertNull;
import static org.junit.Assert.assertEquals;
@@ -38,6 +45,8 @@ public class TestParquetPhysicalPlan {
//public String fileName = "/physical_test2.json";
public String fileName = "parquet_scan_union_screen_physical.json";
+// public String fileName = "parquet-sample.json";
+
@Test
@Ignore
@@ -49,7 +58,51 @@ public class TestParquetPhysicalPlan {
bit1.run();
client.connect();
List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
- System.out.println(String.format("Got %d results", results.size()));
+ RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
+ for (QueryResultBatch b : results) {
+ System.out.println(String.format("Got %d results", b.getHeader().getRowCount()));
+ loader.load(b.getHeader().getDef(), b.getData());
+ for (VectorWrapper vw : loader) {
+ System.out.println(vw.getValueVector().getField().getName());
+ ValueVector vv = vw.getValueVector();
+ for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
+ Object o = vv.getAccessor().getObject(i);
+ System.out.println(vv.getAccessor().getObject(i));
+ }
+ }
+ }
+ client.close();
+ }
+ }
+
+ private class ParquetResultsListener implements UserResultsListener {
+ private CountDownLatch latch = new CountDownLatch(1);
+ @Override
+ public void submissionFailed(RpcException ex) {
+ logger.error("submission failed", ex);
+ latch.countDown();
+ }
+
+ @Override
+ public void resultArrived(QueryResultBatch result) {
+ System.out.printf("Result batch arrived. Number of records: %d", result.getHeader().getRowCount());
+ if (result.getHeader().getIsLastChunk()) latch.countDown();
+ }
+
+ public void await() throws Exception {
+ latch.await();
+ }
+ }
+ @Test
+ @Ignore
+ public void testParseParquetPhysicalPlanRemote() throws Exception {
+ DrillConfig config = DrillConfig.create();
+
+ try(DrillClient client = new DrillClient(config);){
+ client.connect();
+ ParquetResultsListener listener = new ParquetResultsListener();
+ client.runQuery(UserProtos.QueryType.PHYSICAL, Resources.toString(Resources.getResource(fileName),Charsets.UTF_8), listener);
+ listener.await();
client.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 1d91455..7a99c3f 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -48,6 +48,7 @@ import org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.ValueVector;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import parquet.bytes.BytesInput;
@@ -68,6 +69,7 @@ public class ParquetRecordReaderTest {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
private boolean VERBOSE_DEBUG = false;
+ private boolean checkValues = true;
static final int numberRowGroups = 20;
static final int recordsPerRowGroup = 300000;
@@ -100,6 +102,20 @@ public class ParquetRecordReaderTest {
testParquetFullEngineLocalText(planText, fileName, i, numberRowGroups, recordsPerRowGroup);
}
+ @Test
+ @Ignore
+ public void testLocalDistributed() throws Exception {
+ String planName = "/parquet/parquet_scan_union_screen_physical.json";
+ testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20, 300000);
+ }
+
+ @Test
+ @Ignore
+ public void testRemoteDistributed() throws Exception {
+ String planName = "/parquet/parquet_scan_union_screen_physical.json";
+ testParquetFullEngineRemote(planName, fileName, 1, 10, 30000);
+ }
+
private class ParquetResultListener implements UserResultsListener {
private SettableFuture<Void> future = SettableFuture.create();
@@ -155,8 +171,12 @@ public class ParquetRecordReaderTest {
if (VERBOSE_DEBUG){
System.out.print(vv.getAccessor().getObject(j) + ", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
}
- assertField(vv, j, (TypeProtos.MinorType) currentField.type,
- currentField.values[(int) (columnValCounter % 3)], (String) currentField.name + "/");
+ if (checkValues) {
+ try {
+ assertField(vv, j, (TypeProtos.MinorType) currentField.type,
+ currentField.values[(int) (columnValCounter % 3)], (String) currentField.name + "/");
+ } catch (AssertionError e) { submissionFailed(new RpcException(e)); }
+ }
columnValCounter++;
}
if (VERBOSE_DEBUG){
@@ -197,7 +217,9 @@ public class ParquetRecordReaderTest {
batchCounter++;
if(result.getHeader().getIsLastChunk()){
for (String s : valuesChecked.keySet()) {
+ try {
assertEquals("Record count incorrect for column: " + s, totalRecords, (long) valuesChecked.get(s));
+ } catch (AssertionError e) { submissionFailed(new RpcException(e)); }
}
assert valuesChecked.keySet().size() > 0;
@@ -222,11 +244,13 @@ public class ParquetRecordReaderTest {
DrillConfig config = DrillConfig.create();
+ checkValues = false;
+
try(DrillClient client = new DrillClient(config);){
client.connect();
RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator());
ParquetResultListener resultListener = new ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, numberOfTimesRead);
- client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener);
+ client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener);
resultListener.get();
}
@@ -259,6 +283,28 @@ public class ParquetRecordReaderTest {
}
+ //use this method to submit physical plan
+ public void testParquetFullEngineLocalTextDistributed(String planName, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{
+
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+ checkValues = false;
+
+ DrillConfig config = DrillConfig.create();
+
+ try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());){
+ bit1.run();
+ client.connect();
+ RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator());
+ ParquetResultListener resultListener = new ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, numberOfTimesRead);
+ Stopwatch watch = new Stopwatch().start();
+ client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8), resultListener);
+ resultListener.get();
+ System.out.println(String.format("Took %d ms to run query", watch.elapsed(TimeUnit.MILLISECONDS)));
+
+ }
+
+ }
public String pad(String value, int length) {
return pad(value, length, " ");
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
index f508d09..5efecaf 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
@@ -11,10 +11,7 @@
@id : 1,
entries : [
{
- path : "/tmp/testParquetFile_many_types_3"
- },
- {
- path : "/tmp/testParquetFile_many_types_3"
+ path : "/tmp/parquet_test_file_many_types"
}
],
storageengine:{
Re: [5/8] git commit: DRILL-176: Updates to affinity calculator,
fixes for parquet serialization. Fix to ErrorHelper looping
Posted by Tanujit Ghosh <ta...@gmail.com>.
A more detailed output, i just ran the purticular test this time
[tanujit@legolas java-exec]$ mvn -Dtest=ParquetRecordReaderTest test
[INFO] Scanning for projects...
[INFO]
[INFO]
------------------------------------------------------------------------
[INFO] Building java-exec 1.0-SNAPSHOT
[INFO]
------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:copy-resources (copy-resources) @
java-exec ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 369 resources
[INFO]
[INFO] --- maven-enforcer-plugin:1.2:enforce (no_commons_logging) @
java-exec ---
[INFO]
[INFO] --- maven-antrun-plugin:1.6:run (generate-sources) @ java-exec ---
[WARNING] Parameter tasks is deprecated, use target instead
[INFO] Executing tasks
main:
[INFO] Executed tasks
[INFO] Registering compile source root
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/generated-sources
[INFO]
[INFO] --- fmpp-maven-plugin:1.0:generate (generate-sources) @ java-exec ---
- Executing: ValueHolders.java
log4j:WARN No appenders could be found for logger (freemarker.cache).
log4j:WARN Please initialize the log4j system properly.
- Executing: VariableLengthVectors.java
- Executing: FixedValueVectors.java
- Executing: TypeHelper.java
- Executing: NullableValueVectors.java
- Executing: RepeatedValueVectors.java
[INFO] Done
[INFO]
[INFO] --- maven-remote-resources-plugin:1.1:process (default) @ java-exec
---
[INFO] Setting property: classpath.resource.loader.class =>
'org.codehaus.plexus.velocity.ContextClassLoaderResourceLoader'.
[INFO] Setting property: velocimacro.messages.on => 'false'.
[INFO] Setting property: resource.loader => 'classpath'.
[INFO] Setting property: resource.manager.logwhenfound => 'false'.
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @
java-exec ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.0:compile (default-compile) @ java-exec
---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 459 source files to
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/classes
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java:[20,21]
com.sun.corba.se.impl.interceptors.CodecFactoryImpl is internal proprietary
API and may be removed in a future release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java:
Some input files use or override a deprecated API.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java:
Recompile with -Xlint:deprecation for details.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java:
Some input files use unchecked or unsafe operations.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java:
Recompile with -Xlint:unchecked for details.
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources)
@ java-exec ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 36 resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.0:testCompile (default-testCompile) @
java-exec ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 126 source files to
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/test-classes
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[25,16]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[53,17]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[94,12]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[95,14]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[98,26]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[100,27]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java:
Some input files use or override a deprecated API.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java:
Recompile with -Xlint:deprecation for details.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java:
Some input files use unchecked or unsafe operations.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java:
Recompile with -Xlint:unchecked for details.
[INFO]
[INFO] --- maven-surefire-plugin:2.15:test (default-test) @ java-exec ---
[INFO] Surefire report directory:
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/surefire-reports
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running org.apache.drill.exec.store.parquet.ParquetRecordReaderTest
08:18:15.598 [main] DEBUG org.reflections.Reflections - going to scan these
urls:
jar:file:/home/tanujit/.m2/repository/org/apache/drill/common/1.0-SNAPSHOT/common-1.0-SNAPSHOT.jar!/
file:/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/test-classes/
file:/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/classes/
jar:file:/home/tanujit/.m2/repository/org/apache/drill/common/1.0-SNAPSHOT/common-1.0-SNAPSHOT-tests.jar!/
08:18:16.124 [main] INFO org.reflections.Reflections - Reflections took
524 ms to scan 4 urls, producing 667 keys and 1620 values
08:18:16.148 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as
the default logging framework
08:18:16.150 [main] DEBUG i.n.c.MultithreadEventLoopGroup -
-Dio.netty.eventLoopThreads: 8
08:18:16.165 [main] DEBUG i.n.util.internal.PlatformDependent - UID: 1000
08:18:16.165 [main] DEBUG i.n.util.internal.PlatformDependent - Java
version: 7
08:18:16.166 [main] DEBUG i.n.util.internal.PlatformDependent -
-Dio.netty.noUnsafe: false
08:18:16.167 [main] DEBUG i.n.util.internal.PlatformDependent0 -
java.nio.ByteBuffer.cleaner: available
08:18:16.167 [main] DEBUG i.n.util.internal.PlatformDependent0 -
java.nio.Buffer.address: available
08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent0 -
sun.misc.Unsafe.theUnsafe: available
08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent0 -
sun.misc.Unsafe.copyMemory: available
08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent0 -
java.nio.Bits.unaligned: true
08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent -
sun.misc.Unsafe: available
08:18:16.169 [main] DEBUG i.n.util.internal.PlatformDependent -
-Dio.netty.noJavassist: false
08:18:16.227 [main] DEBUG i.n.util.internal.PlatformDependent - Javassist:
available
08:18:16.228 [main] DEBUG i.n.util.internal.PlatformDependent -
-Dio.netty.noPreferDirect: false
08:18:16.249 [main] DEBUG io.netty.channel.nio.NioEventLoop -
-Dio.netty.noKeySetOptimization: false
08:18:16.249 [main] DEBUG io.netty.channel.nio.NioEventLoop -
-Dio.netty.selectorAutoRebuildThreshold: 512
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.numHeapArenas: 4
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.numDirectArenas: 4
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.pageSize: 8192
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.maxOrder: 11
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.chunkSize: 16777216
08:18:16.335 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo
08:18:16.335 [main] DEBUG io.netty.util.NetUtil - Loopback address:
/0:0:0:0:0:0:0:1%1 (primary)
08:18:16.335 [main] DEBUG io.netty.util.NetUtil - Loopback address: /
127.0.0.1
08:18:16.335 [main] DEBUG io.netty.util.NetUtil -
/proc/sys/net/core/somaxconn: 128
08:18:16.344 [UserServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x30e3b962] REGISTERED
08:18:16.347 [UserServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x30e3b962] BIND(0.0.0.0/0.0.0.0:31010)
08:18:16.350 [UserServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x30e3b962, /0:0:0:0:0:0:0:0:31010] ACTIVE
08:18:16.366 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b] REGISTERED
08:18:16.367 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b] BIND(0.0.0.0/0.0.0.0:31011)
08:18:16.367 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] ACTIVE
08:18:16.571 [Client-1] DEBUG
i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated:
io.netty.util.internal.__matchers__.io.netty.buffer.ByteBufMatcher
08:18:16.574 [Client-1] DEBUG
i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated:
io.netty.util.internal.__matchers__.org.apache.drill.exec.rpc.OutboundRpcMessageMatcher
08:18:16.576 [Client-1] DEBUG
i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated:
io.netty.util.internal.__matchers__.org.apache.drill.exec.rpc.InboundRpcMessageMatcher
08:18:16.579 [UserServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x30e3b962, /0:0:0:0:0:0:0:0:31010] RECEIVED: [id: 0xf33275f9, /
192.168.0.102:34995 => /192.168.0.102:31010]
08:18:16.589 [Client-1] DEBUG io.netty.util.ResourceLeakDetector -
-Dio.netty.noResourceLeakDetection: false
08:18:16.770 [WorkManager-1] DEBUG org.apache.hadoop.security.Groups -
Creating new Groups object
08:18:16.817 [WorkManager-1] DEBUG org.apache.hadoop.security.Groups -
Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
cacheTimeout=300000
08:18:16.882 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation -
hadoop login
08:18:16.882 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation -
hadoop login commit
08:18:16.898 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation -
using local user:UnixPrincipal: tanujit
08:18:16.900 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation -
UGI loginUser:tanujit
08:18:16.940 [WorkManager-1] DEBUG org.apache.hadoop.fs.FileSystem -
Creating filesystem for file:///
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
08:18:17.922 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x1f148baa, /
192.168.0.102:45778 => /192.168.0.102:31011]
08:18:17.923 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xc4ed7542, /
192.168.0.102:45779 => /192.168.0.102:31011]
08:18:17.937 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x70a4a44c, /
192.168.0.102:45780 => /192.168.0.102:31011]
08:18:17.937 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xf6360725, /
192.168.0.102:45781 => /192.168.0.102:31011]
08:18:17.937 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x51467583, /
192.168.0.102:45782 => /192.168.0.102:31011]
08:18:17.938 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xaf6d72df, /
192.168.0.102:45783 => /192.168.0.102:31011]
08:18:17.938 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x6a8a1670, /
192.168.0.102:45784 => /192.168.0.102:31011]
08:18:17.938 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xf18205a3, /
192.168.0.102:45785 => /192.168.0.102:31011]
08:18:17.939 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xa09abc2c, /
192.168.0.102:45786 => /192.168.0.102:31011]
08:18:17.943 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x3784f526, /
192.168.0.102:45787 => /192.168.0.102:31011]
On Sat, Aug 24, 2013 at 8:15 AM, Tanujit Ghosh <ta...@gmail.com>wrote:
> Hi,
>
> when i try mvn install after these changes the
> org.apache.drill.exec.store.parquet.ParquetRecordReaderTest is hanging.
>
> Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.596 sec
> - in org.apache.drill.exec.expr.ExpressionTest
> Running org.apache.drill.exec.store.TestAffinityCalculator
> Took 0.616287 ms to build range map
> Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.137 sec
> - in org.apache.drill.exec.store.TestAffinityCalculator
> Running org.apache.drill.exec.store.parquet.ParquetRecordReaderTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
>
> Environment is Fedora 18, open jdk 1.7
>
> with skipTests everything is getting compiled fine.
>
> Regards
> Tanujit
>
>
>
> On Fri, Aug 23, 2013 at 5:36 AM, <ja...@apache.org> wrote:
>
>> DRILL-176: Updates to affinity calculator, fixes for parquet
>> serialization. Fix to ErrorHelper looping
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
>> Commit:
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/7edd3617
>> Tree:
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/7edd3617
>> Diff:
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/7edd3617
>>
>> Branch: refs/heads/master
>> Commit: 7edd36170a9be291a69e44f6090474193485bf14
>> Parents: d6ae53e
>> Author: Steven Phillips <sp...@maprtech.com>
>> Authored: Thu Aug 22 16:18:55 2013 -0700
>> Committer: Jacques Nadeau <ja...@apache.org>
>> Committed: Thu Aug 22 16:18:55 2013 -0700
>>
>> ----------------------------------------------------------------------
>> .../drill/exec/planner/fragment/Wrapper.java | 5 +-
>> .../drill/exec/store/AffinityCalculator.java | 91 ++++++----
>> .../exec/store/parquet/ParquetGroupScan.java | 177 +++++++++----------
>> .../exec/store/parquet/ParquetRecordReader.java | 2 +-
>> .../store/parquet/ParquetScanBatchCreator.java | 10 +-
>> .../drill/exec/work/foreman/ErrorHelper.java | 8 +-
>> .../exec/store/TestParquetPhysicalPlan.java | 55 +++++-
>> .../store/parquet/ParquetRecordReaderTest.java | 52 +++++-
>> .../parquet_scan_union_screen_physical.json | 5 +-
>> 9 files changed, 257 insertions(+), 148 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> index d5a24b0..8c4b0b4 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> @@ -151,15 +151,12 @@ public class Wrapper {
>> for (int i = start; i < start + width; i++) {
>> endpoints.add(all.get(i % div));
>> }
>> - } else if (values.size() < width) {
>> - throw new NotImplementedException(
>> - "Haven't implemented a scenario where we have some node
>> affinity but the affinity list is smaller than the expected width.");
>> } else {
>> // get nodes with highest affinity.
>> Collections.sort(values);
>> values = Lists.reverse(values);
>> for (int i = 0; i < width; i++) {
>> - endpoints.add(values.get(i).getEndpoint());
>> + endpoints.add(values.get(i%values.size()).getEndpoint());
>> }
>> }
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> index b4092cc..b341ea4 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> @@ -1,6 +1,7 @@
>> package org.apache.drill.exec.store;
>>
>>
>> +import com.google.common.base.Stopwatch;
>> import com.google.common.collect.ImmutableRangeMap;
>> import com.google.common.collect.Range;
>> import org.apache.drill.exec.store.parquet.ParquetGroupScan;
>> @@ -13,6 +14,7 @@ import org.apache.hadoop.fs.Path;
>>
>> import java.io.IOException;
>> import java.util.*;
>> +import java.util.concurrent.TimeUnit;
>>
>> public class AffinityCalculator {
>> static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
>> @@ -24,6 +26,7 @@ public class AffinityCalculator {
>> String fileName;
>> Collection<DrillbitEndpoint> endpoints;
>> HashMap<String,DrillbitEndpoint> endPointMap;
>> + Stopwatch watch = new Stopwatch();
>>
>> public AffinityCalculator(String fileName, FileSystem fs,
>> Collection<DrillbitEndpoint> endpoints) {
>> this.fs = fs;
>> @@ -33,16 +36,20 @@ public class AffinityCalculator {
>> buildEndpointMap();
>> }
>>
>> + /**
>> + * Builds a mapping of block locations to file byte range
>> + */
>> private void buildBlockMap() {
>> try {
>> + watch.start();
>> FileStatus file = fs.getFileStatus(new Path(fileName));
>> - long tC = System.nanoTime();
>> blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
>> - long tD = System.nanoTime();
>> + watch.stop();
>> logger.debug("Block locations: {}", blocks);
>> - logger.debug("Took {} ms to get Block locations", (float)(tD - tC)
>> / 1e6);
>> + logger.debug("Took {} ms to get Block locations",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>> } catch (IOException ioe) { throw new RuntimeException(ioe); }
>> - long tA = System.nanoTime();
>> + watch.reset();
>> + watch.start();
>> ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new
>> ImmutableRangeMap.Builder<Long,BlockLocation>();
>> for (BlockLocation block : blocks) {
>> long start = block.getOffset();
>> @@ -51,62 +58,72 @@ public class AffinityCalculator {
>> blockMapBuilder = blockMapBuilder.put(range, block);
>> }
>> blockMap = blockMapBuilder.build();
>> - long tB = System.nanoTime();
>> - logger.debug("Took {} ms to build block map", (float)(tB - tA) /
>> 1e6);
>> + watch.stop();
>> + logger.debug("Took {} ms to build block map",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>> }
>> /**
>> + * For a given RowGroup, calculate how many bytes are available on
>> each on drillbit endpoint
>> *
>> - * @param entry
>> + * @param rowGroup the RowGroup to calculate endpoint bytes for
>> */
>> - public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) {
>> - long tA = System.nanoTime();
>> + public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) {
>> + watch.reset();
>> + watch.start();
>> HashMap<String,Long> hostMap = new HashMap<>();
>> - long start = entry.getStart();
>> - long end = start + entry.getLength();
>> - Range<Long> entryRange = Range.closedOpen(start, end);
>> - ImmutableRangeMap<Long,BlockLocation> subRangeMap =
>> blockMap.subRangeMap(entryRange);
>> - for (Map.Entry<Range<Long>,BlockLocation> e :
>> subRangeMap.asMapOfRanges().entrySet()) {
>> - String[] hosts = null;
>> - Range<Long> blockRange = e.getKey();
>> + HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap();
>> + long start = rowGroup.getStart();
>> + long end = start + rowGroup.getLength();
>> + Range<Long> rowGroupRange = Range.closedOpen(start, end);
>> +
>> + // Find submap of ranges that intersect with the rowGroup
>> + ImmutableRangeMap<Long,BlockLocation> subRangeMap =
>> blockMap.subRangeMap(rowGroupRange);
>> +
>> + // Iterate through each block in this submap and get the host for
>> the block location
>> + for (Map.Entry<Range<Long>,BlockLocation> block :
>> subRangeMap.asMapOfRanges().entrySet()) {
>> + String[] hosts;
>> + Range<Long> blockRange = block.getKey();
>> try {
>> - hosts = e.getValue().getHosts();
>> - } catch (IOException ioe) { /*TODO Handle this exception */}
>> - Range<Long> intersection = entryRange.intersection(blockRange);
>> + hosts = block.getValue().getHosts();
>> + } catch (IOException ioe) {
>> + throw new RuntimeException("Failed to get hosts for block
>> location", ioe);
>> + }
>> + Range<Long> intersection = rowGroupRange.intersection(blockRange);
>> long bytes = intersection.upperEndpoint() -
>> intersection.lowerEndpoint();
>> +
>> + // For each host in the current block location, add the
>> intersecting bytes to the corresponding endpoint
>> for (String host : hosts) {
>> - if (hostMap.containsKey(host)) {
>> - hostMap.put(host, hostMap.get(host) + bytes);
>> + DrillbitEndpoint endpoint = getDrillBitEndpoint(host);
>> + if (endpointByteMap.containsKey(endpoint)) {
>> + endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) +
>> bytes);
>> } else {
>> - hostMap.put(host, bytes);
>> + if (endpoint != null ) endpointByteMap.put(endpoint, bytes);
>> }
>> }
>> }
>> - HashMap<DrillbitEndpoint,Long> ebs = new HashMap();
>> - try {
>> - for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) {
>> - String host = hostEntry.getKey();
>> - Long bytes = hostEntry.getValue();
>> - DrillbitEndpoint d = getDrillBitEndpoint(host);
>> - if (d != null ) ebs.put(d, bytes);
>> - }
>> - } catch (NullPointerException n) {}
>> - entry.setEndpointBytes(ebs);
>> - long tB = System.nanoTime();
>> - logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) /
>> 1e6);
>> +
>> + rowGroup.setEndpointBytes(endpointByteMap);
>> + rowGroup.setMaxBytes(endpointByteMap.size() > 0 ?
>> Collections.max(endpointByteMap.values()) : 0);
>> + logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(),
>> rowGroup.getStart(), rowGroup.getMaxBytes());
>> + watch.stop();
>> + logger.debug("Took {} ms to set endpoint bytes",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>> }
>>
>> private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
>> return endPointMap.get(hostName);
>> }
>>
>> + /**
>> + * Builds a mapping of drillbit endpoints to hostnames
>> + */
>> private void buildEndpointMap() {
>> - long tA = System.nanoTime();
>> + watch.reset();
>> + watch.start();
>> endPointMap = new HashMap<String, DrillbitEndpoint>();
>> for (DrillbitEndpoint d : endpoints) {
>> String hostName = d.getAddress();
>> endPointMap.put(hostName, d);
>> }
>> - long tB = System.nanoTime();
>> - logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) /
>> 1e6);
>> + watch.stop();
>> + logger.debug("Took {} ms to build endpoint map",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>> }
>> }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> index 9e48d33..64ced87 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> @@ -18,14 +18,13 @@
>> package org.apache.drill.exec.store.parquet;
>>
>> import java.io.IOException;
>> -import java.util.ArrayList;
>> -import java.util.Collection;
>> -import java.util.Collections;
>> -import java.util.Comparator;
>> -import java.util.HashMap;
>> -import java.util.LinkedList;
>> -import java.util.List;
>> +import java.util.*;
>> +import java.util.concurrent.TimeUnit;
>>
>> +import com.google.common.base.Stopwatch;
>> +import com.google.common.collect.ArrayListMultimap;
>> +import com.google.common.collect.Lists;
>> +import com.google.common.collect.Multimap;
>> import org.apache.drill.common.config.DrillConfig;
>> import org.apache.drill.exec.exception.SetupException;
>> import org.apache.drill.exec.physical.EndpointAffinity;
>> @@ -59,8 +58,9 @@ import com.google.common.base.Preconditions;
>> public class ParquetGroupScan extends AbstractGroupScan {
>> static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
>>
>> - private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings;
>> + private ArrayListMultimap<Integer,
>> ParquetRowGroupScan.RowGroupReadEntry> mappings;
>> private List<RowGroupInfo> rowGroupInfos;
>> + private Stopwatch watch = new Stopwatch();
>>
>> public List<ReadEntryWithPath> getEntries() {
>> return entries;
>> @@ -110,16 +110,14 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>> }
>>
>> private void readFooter() throws IOException {
>> - long tA = System.nanoTime();
>> + watch.reset();
>> + watch.start();
>> rowGroupInfos = new ArrayList();
>> long start = 0, length = 0;
>> ColumnChunkMetaData columnChunkMetaData;
>> for (ReadEntryWithPath readEntryWithPath : entries){
>> Path path = new Path(readEntryWithPath.getPath());
>> ParquetMetadata footer =
>> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
>> -// FileSystem fs =
>> FileSystem.get(this.storageEngine.getHadoopConfig());
>> -// FileStatus status = fs.getFileStatus(path);
>> -// ParquetMetadata footer =
>> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status);
>> readEntryWithPath.getPath();
>>
>> int i = 0;
>> @@ -138,38 +136,21 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>> i++;
>> }
>> }
>> - long tB = System.nanoTime();
>> - logger.debug("Took {} ms to get row group infos", (float)(tB - tA) /
>> 1E6);
>> + watch.stop();
>> + logger.debug("Took {} ms to get row group infos",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>> }
>>
>> private void calculateEndpointBytes() {
>> - long tA = System.nanoTime();
>> + watch.reset();
>> + watch.start();
>> AffinityCalculator ac = new AffinityCalculator(fileName, fs,
>> availableEndpoints);
>> for (RowGroupInfo e : rowGroupInfos) {
>> ac.setEndpointBytes(e);
>> totalBytes += e.getLength();
>> }
>> - long tB = System.nanoTime();
>> - logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB -
>> tA) / 1E6);
>> + watch.stop();
>> + logger.debug("Took {} ms to calculate EndpointBytes",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>> }
>> -/*
>> - public LinkedList<RowGroupInfo> getRowGroups() {
>> - return rowGroups;
>> - }
>> -
>> - public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) {
>> - this.rowGroups = rowGroups;
>> - }
>> -
>> - public static class ParquetFileReadEntry {
>> -
>> - String path;
>> -
>> - public ParquetFileReadEntry(@JsonProperty String path){
>> - this.path = path;
>> - }
>> - }
>> - */
>>
>> @JsonIgnore
>> public FileSystem getFileSystem() {
>> @@ -232,16 +213,22 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>> }
>> }
>>
>> + /**
>> + *Calculates the affinity each endpoint has for this scan, by adding
>> up the affinity each endpoint has for each
>> + * rowGroup
>> + * @return a list of EndpointAffinity objects
>> + */
>> @Override
>> public List<EndpointAffinity> getOperatorAffinity() {
>> - long tA = System.nanoTime();
>> + watch.reset();
>> + watch.start();
>> if (this.endpointAffinities == null) {
>> HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
>> for (RowGroupInfo entry : rowGroupInfos) {
>> for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
>> long bytes = entry.getEndpointBytes().get(d);
>> float affinity = (float)bytes / (float)totalBytes;
>> - logger.error("RowGroup: {} Endpoint: {} Bytes: {}",
>> entry.getRowGroupIndex(), d.getAddress(), bytes);
>> + logger.debug("RowGroup: {} Endpoint: {} Bytes: {}",
>> entry.getRowGroupIndex(), d.getAddress(), bytes);
>> if (affinities.keySet().contains(d)) {
>> affinities.put(d, affinities.get(d) + affinity);
>> } else {
>> @@ -256,83 +243,90 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>> }
>> this.endpointAffinities = affinityList;
>> }
>> - long tB = System.nanoTime();
>> - logger.debug("Took {} ms to get operator affinity", (float)(tB - tA)
>> / 1E6);
>> + watch.stop();
>> + logger.debug("Took {} ms to get operator affinity",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>> return this.endpointAffinities;
>> }
>>
>>
>> + static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01};
>>
>> -
>> + /**
>> + *
>> + * @param incomingEndpoints
>> + */
>> @Override
>> - public void applyAssignments(List<DrillbitEndpoint> endpoints) {
>> - long tA = System.nanoTime();
>> - Preconditions.checkArgument(endpoints.size() <=
>> rowGroupInfos.size());
>> -
>> - int i = 0;
>> - for (DrillbitEndpoint endpoint : endpoints) {
>> - logger.debug("Endpoint index {}, endpoint host: {}", i++,
>> endpoint.getAddress());
>> + public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints)
>> {
>> + watch.reset();
>> + watch.start();
>> + Preconditions.checkArgument(incomingEndpoints.size() <=
>> rowGroupInfos.size());
>> + mappings = ArrayListMultimap.create();
>> + ArrayList rowGroupList = new ArrayList(rowGroupInfos);
>> + List<DrillbitEndpoint> endpointLinkedlist =
>> Lists.newLinkedList(incomingEndpoints);
>> + for(double cutoff : ASSIGNMENT_CUTOFFS ){
>> + scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff,
>> false);
>> }
>> -
>> - Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
>> - mappings = new LinkedList[endpoints.size()];
>> - LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints,
>> rowGroupInfos, 100, true, false);
>> - LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints,
>> unassigned, 50, true, false);
>> - LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints,
>> unassigned2, 25, true, false);
>> - LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints,
>> unassigned3, 0, false, false);
>> - LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints,
>> unassigned4, 0, false, true);
>> - assert unassigned5.size() == 0 : String.format("All readEntries
>> should be assigned by now, but some are still unassigned");
>> - long tB = System.nanoTime();
>> - logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) /
>> 1E6);
>> + scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true);
>> + watch.stop();
>> + logger.debug("Took {} ms to apply assignments",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>> + Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries
>> should be assigned by now, but some are still unassigned");
>> + Preconditions.checkArgument(!rowGroupInfos.isEmpty());
>> }
>>
>> - private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint>
>> endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean
>> mustContain, boolean assignAll) {
>> - Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
>> - LinkedList<RowGroupInfo> unassigned = new LinkedList<>();
>> -
>> - int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() *
>> 1.5);
>> + public int fragmentPointer = 0;
>> +
>> + /**
>> + *
>> + * @param endpointAssignments the mapping between fragment/endpoint
>> and rowGroup
>> + * @param endpoints the list of drillbits, ordered by the
>> corresponding fragment
>> + * @param rowGroups the list of rowGroups to assign
>> + * @param requiredPercentage the percentage of max bytes required to
>> make an assignment
>> + * @param assignAll if true, will assign even if no affinity
>> + */
>> + private void scanAndAssign (Multimap<Integer,
>> ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments,
>> List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double
>> requiredPercentage, boolean assignAll) {
>> + Collections.sort(rowGroups, new ParquetReadEntryComparator());
>> + final boolean requireAffinity = requiredPercentage > 0;
>> + int maxAssignments = (int) (rowGroups.size() / endpoints.size());
>> +
>> + if (maxAssignments < 1) maxAssignments = 1;
>> +
>> + for(Iterator<RowGroupInfo> iter = rowGroups.iterator();
>> iter.hasNext();){
>> + RowGroupInfo rowGroupInfo = iter.next();
>> + for (int i = 0; i < endpoints.size(); i++) {
>> + int minorFragmentId = (fragmentPointer + i) % endpoints.size();
>> + DrillbitEndpoint currentEndpoint =
>> endpoints.get(minorFragmentId);
>> + Map<DrillbitEndpoint, Long> bytesPerEndpoint =
>> rowGroupInfo.getEndpointBytes();
>> + boolean haveAffinity =
>> bytesPerEndpoint.containsKey(currentEndpoint) ;
>>
>> - if (maxEntries < 1) maxEntries = 1;
>> -
>> - int i =0;
>> - for(RowGroupInfo e : rowGroups) {
>> - boolean assigned = false;
>> - for (int j = i; j < i + endpoints.size(); j++) {
>> - DrillbitEndpoint currentEndpoint =
>> endpoints.get(j%endpoints.size());
>> if (assignAll ||
>> - (e.getEndpointBytes().size() > 0 &&
>> - (e.getEndpointBytes().containsKey(currentEndpoint) ||
>> !mustContain) &&
>> - (mappings[j%endpoints.size()] == null ||
>> mappings[j%endpoints.size()].size() < maxEntries) &&
>> - e.getEndpointBytes().get(currentEndpoint) >=
>> e.getMaxBytes() * requiredPercentage / 100)) {
>> - LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries =
>> mappings[j%endpoints.size()];
>> - if(entries == null){
>> - entries = new
>> LinkedList<ParquetRowGroupScan.RowGroupReadEntry>();
>> - mappings[j%endpoints.size()] = entries;
>> - }
>> - entries.add(e.getRowGroupReadEntry());
>> - logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}",
>> e.getPath(), e.getStart(), currentEndpoint.getAddress());
>> - assigned = true;
>> + (!bytesPerEndpoint.isEmpty() &&
>> + (!requireAffinity || haveAffinity) &&
>> +
>> (!endpointAssignments.containsKey(minorFragmentId) ||
>> endpointAssignments.get(minorFragmentId).size() < maxAssignments) &&
>> + bytesPerEndpoint.get(currentEndpoint) >=
>> rowGroupInfo.getMaxBytes() * requiredPercentage)) {
>> +
>> + endpointAssignments.put(minorFragmentId,
>> rowGroupInfo.getRowGroupReadEntry());
>> + logger.debug("Assigned rowGroup {} to minorFragmentId {}
>> endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId,
>> endpoints.get(minorFragmentId).getAddress());
>> + iter.remove();
>> + fragmentPointer = (minorFragmentId + 1) % endpoints.size();
>> break;
>> }
>> }
>> - if (!assigned) unassigned.add(e);
>> - i++;
>> +
>> }
>> - return unassigned;
>> }
>>
>> @Override
>> public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
>> - assert minorFragmentId < mappings.length : String.format("Mappings
>> length [%d] should be longer than minor fragment id [%d] but it isn't.",
>> mappings.length, minorFragmentId);
>> - for (ParquetRowGroupScan.RowGroupReadEntry rg :
>> mappings[minorFragmentId]) {
>> + assert minorFragmentId < mappings.size() : String.format("Mappings
>> length [%d] should be longer than minor fragment id [%d] but it isn't.",
>> mappings.size(), minorFragmentId);
>> + for (ParquetRowGroupScan.RowGroupReadEntry rg :
>> mappings.get(minorFragmentId)) {
>> logger.debug("minorFragmentId: {} Path: {} RowGroupIndex:
>> {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
>> }
>> +
>> Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(),
>> String.format("MinorFragmentId %d has no read entries assigned",
>> minorFragmentId));
>> try {
>> - return new ParquetRowGroupScan(storageEngine, engineConfig,
>> mappings[minorFragmentId]);
>> + return new ParquetRowGroupScan(storageEngine, engineConfig,
>> mappings.get(minorFragmentId));
>> } catch (SetupException e) {
>> - e.printStackTrace(); // TODO - fix this
>> + throw new RuntimeException("Error setting up ParquetRowGroupScan",
>> e);
>> }
>> - return null;
>> }
>>
>> @Override
>> @@ -342,7 +336,8 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>>
>> @Override
>> public OperatorCost getCost() {
>> - return new OperatorCost(1,1,1,1);
>> + //TODO Figure out how to properly calculate cost
>> + return new OperatorCost(1,rowGroupInfos.size(),1,1);
>> }
>>
>> @Override
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> index 4e46034..3aaa987 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> @@ -211,8 +211,8 @@ public class ParquetRecordReader implements
>> RecordReader {
>> }
>> for (VarLenBinaryReader.VarLengthColumn r :
>> varLengthReader.columns) {
>> output.addField(r.valueVecHolder.getValueVector());
>> - output.setNewSchema();
>> }
>> + output.setNewSchema();
>> }catch(SchemaChangeException e) {
>> throw new ExecutionSetupException("Error setting up output
>> mutator.", e);
>> }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> index 03fb4ec..addd288 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> @@ -21,7 +21,9 @@ import java.io.IOException;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>> import java.util.List;
>> +import java.util.concurrent.TimeUnit;
>>
>> +import com.google.common.base.Stopwatch;
>> import org.apache.drill.common.exceptions.ExecutionSetupException;
>> import org.apache.drill.exec.ops.FragmentContext;
>> import org.apache.drill.exec.physical.impl.BatchCreator;
>> @@ -40,12 +42,12 @@ import parquet.hadoop.ParquetFileReader;
>> import parquet.hadoop.metadata.ParquetMetadata;
>>
>> public class ParquetScanBatchCreator implements
>> BatchCreator<ParquetRowGroupScan>{
>> - static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
>> + static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
>>
>> @Override
>> public RecordBatch getBatch(FragmentContext context,
>> ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws
>> ExecutionSetupException {
>> - long tA = System.nanoTime(), tB;
>> - System.out.println( new SimpleDateFormat("mm:ss S").format(new
>> Date()) + " :Start of ScanBatCreator.scanBatch");
>> + Stopwatch watch = new Stopwatch();
>> + watch.start();
>> Preconditions.checkArgument(children.isEmpty());
>> List<RecordReader> readers = Lists.newArrayList();
>> for(ParquetRowGroupScan.RowGroupReadEntry e :
>> rowGroupScan.getRowGroupReadEntries()){
>> @@ -68,7 +70,7 @@ public class ParquetScanBatchCreator implements
>> BatchCreator<ParquetRowGroupScan
>> throw new ExecutionSetupException(e1);
>> }
>> }
>> - System.out.println( "Total time in method: " + ((float)
>> (System.nanoTime() - tA) / 1e9));
>> + logger.debug("total time in ScanBatchCreator.getBatch: {} ms",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>> return new ScanBatch(context, readers.iterator());
>> }
>> }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> index 9a33109..72c5f34 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> @@ -35,8 +35,8 @@ public class ErrorHelper {
>> if(message != null){
>> sb.append(message);
>> }
>> -
>> - do{
>> +
>> + while (true) {
>> sb.append(" < ");
>> sb.append(t.getClass().getSimpleName());
>> if(t.getMessage() != null){
>> @@ -44,7 +44,9 @@ public class ErrorHelper {
>> sb.append(t.getMessage());
>> sb.append(" ]");
>> }
>> - }while(t.getCause() != null && t.getCause() != t);
>> + if (t.getCause() == null || t.getCause() == t) break;
>> + t = t.getCause();
>> + }
>>
>> builder.setMessage(sb.toString());
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> index e2a00f1..18ac294 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> @@ -13,10 +13,16 @@ import org.apache.drill.exec.physical.PhysicalPlan;
>> import org.apache.drill.exec.planner.PhysicalPlanReader;
>> import org.apache.drill.exec.proto.CoordinationProtos;
>> import org.apache.drill.exec.proto.UserProtos;
>> +import org.apache.drill.exec.record.RecordBatchLoader;
>> +import org.apache.drill.exec.record.VectorWrapper;
>> +import org.apache.drill.exec.rpc.RpcException;
>> import org.apache.drill.exec.rpc.user.QueryResultBatch;
>> +import org.apache.drill.exec.rpc.user.UserResultsListener;
>> +import org.apache.drill.exec.server.BootStrapContext;
>> import org.apache.drill.exec.server.Drillbit;
>> import org.apache.drill.exec.server.RemoteServiceSet;
>> import org.apache.drill.exec.store.parquet.ParquetGroupScan;
>> +import org.apache.drill.exec.vector.ValueVector;
>> import org.apache.hadoop.fs.BlockLocation;
>> import org.apache.hadoop.fs.FileStatus;
>> import org.apache.hadoop.fs.FileSystem;
>> @@ -29,6 +35,7 @@ import java.io.IOException;
>> import java.nio.charset.Charset;
>> import java.util.LinkedList;
>> import java.util.List;
>> +import java.util.concurrent.CountDownLatch;
>>
>> import static junit.framework.Assert.assertNull;
>> import static org.junit.Assert.assertEquals;
>> @@ -38,6 +45,8 @@ public class TestParquetPhysicalPlan {
>>
>> //public String fileName = "/physical_test2.json";
>> public String fileName = "parquet_scan_union_screen_physical.json";
>> +// public String fileName = "parquet-sample.json";
>> +
>>
>> @Test
>> @Ignore
>> @@ -49,7 +58,51 @@ public class TestParquetPhysicalPlan {
>> bit1.run();
>> client.connect();
>> List<QueryResultBatch> results =
>> client.runQuery(UserProtos.QueryType.PHYSICAL,
>> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
>> - System.out.println(String.format("Got %d results",
>> results.size()));
>> + RecordBatchLoader loader = new
>> RecordBatchLoader(bit1.getContext().getAllocator());
>> + for (QueryResultBatch b : results) {
>> + System.out.println(String.format("Got %d results",
>> b.getHeader().getRowCount()));
>> + loader.load(b.getHeader().getDef(), b.getData());
>> + for (VectorWrapper vw : loader) {
>> + System.out.println(vw.getValueVector().getField().getName());
>> + ValueVector vv = vw.getValueVector();
>> + for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
>> + Object o = vv.getAccessor().getObject(i);
>> + System.out.println(vv.getAccessor().getObject(i));
>> + }
>> + }
>> + }
>> + client.close();
>> + }
>> + }
>> +
>> + private class ParquetResultsListener implements UserResultsListener {
>> + private CountDownLatch latch = new CountDownLatch(1);
>> + @Override
>> + public void submissionFailed(RpcException ex) {
>> + logger.error("submission failed", ex);
>> + latch.countDown();
>> + }
>> +
>> + @Override
>> + public void resultArrived(QueryResultBatch result) {
>> + System.out.printf("Result batch arrived. Number of records: %d",
>> result.getHeader().getRowCount());
>> + if (result.getHeader().getIsLastChunk()) latch.countDown();
>> + }
>> +
>> + public void await() throws Exception {
>> + latch.await();
>> + }
>> + }
>> + @Test
>> + @Ignore
>> + public void testParseParquetPhysicalPlanRemote() throws Exception {
>> + DrillConfig config = DrillConfig.create();
>> +
>> + try(DrillClient client = new DrillClient(config);){
>> + client.connect();
>> + ParquetResultsListener listener = new ParquetResultsListener();
>> + client.runQuery(UserProtos.QueryType.PHYSICAL,
>> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8),
>> listener);
>> + listener.await();
>> client.close();
>> }
>> }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> index 1d91455..7a99c3f 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> @@ -48,6 +48,7 @@ import
>> org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo;
>> import org.apache.drill.exec.vector.BaseDataValueVector;
>> import org.apache.drill.exec.vector.ValueVector;
>> import org.junit.BeforeClass;
>> +import org.junit.Ignore;
>> import org.junit.Test;
>>
>> import parquet.bytes.BytesInput;
>> @@ -68,6 +69,7 @@ public class ParquetRecordReaderTest {
>> static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
>>
>> private boolean VERBOSE_DEBUG = false;
>> + private boolean checkValues = true;
>>
>> static final int numberRowGroups = 20;
>> static final int recordsPerRowGroup = 300000;
>> @@ -100,6 +102,20 @@ public class ParquetRecordReaderTest {
>> testParquetFullEngineLocalText(planText, fileName, i,
>> numberRowGroups, recordsPerRowGroup);
>> }
>>
>> + @Test
>> + @Ignore
>> + public void testLocalDistributed() throws Exception {
>> + String planName = "/parquet/parquet_scan_union_screen_physical.json";
>> + testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20,
>> 300000);
>> + }
>> +
>> + @Test
>> + @Ignore
>> + public void testRemoteDistributed() throws Exception {
>> + String planName = "/parquet/parquet_scan_union_screen_physical.json";
>> + testParquetFullEngineRemote(planName, fileName, 1, 10, 30000);
>> + }
>> +
>>
>> private class ParquetResultListener implements UserResultsListener {
>> private SettableFuture<Void> future = SettableFuture.create();
>> @@ -155,8 +171,12 @@ public class ParquetRecordReaderTest {
>> if (VERBOSE_DEBUG){
>> System.out.print(vv.getAccessor().getObject(j) + ", " + (j %
>> 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
>> }
>> - assertField(vv, j, (TypeProtos.MinorType) currentField.type,
>> - currentField.values[(int) (columnValCounter % 3)],
>> (String) currentField.name + "/");
>> + if (checkValues) {
>> + try {
>> + assertField(vv, j, (TypeProtos.MinorType)
>> currentField.type,
>> + currentField.values[(int) (columnValCounter % 3)],
>> (String) currentField.name + "/");
>> + } catch (AssertionError e) { submissionFailed(new
>> RpcException(e)); }
>> + }
>> columnValCounter++;
>> }
>> if (VERBOSE_DEBUG){
>> @@ -197,7 +217,9 @@ public class ParquetRecordReaderTest {
>> batchCounter++;
>> if(result.getHeader().getIsLastChunk()){
>> for (String s : valuesChecked.keySet()) {
>> + try {
>> assertEquals("Record count incorrect for column: " + s,
>> totalRecords, (long) valuesChecked.get(s));
>> + } catch (AssertionError e) { submissionFailed(new
>> RpcException(e)); }
>> }
>>
>> assert valuesChecked.keySet().size() > 0;
>> @@ -222,11 +244,13 @@ public class ParquetRecordReaderTest {
>>
>> DrillConfig config = DrillConfig.create();
>>
>> + checkValues = false;
>> +
>> try(DrillClient client = new DrillClient(config);){
>> client.connect();
>> RecordBatchLoader batchLoader = new
>> RecordBatchLoader(client.getAllocator());
>> ParquetResultListener resultListener = new
>> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups,
>> numberOfTimesRead);
>> - client.runQuery(UserProtos.QueryType.LOGICAL,
>> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8),
>> resultListener);
>> + client.runQuery(UserProtos.QueryType.PHYSICAL,
>> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8),
>> resultListener);
>> resultListener.get();
>> }
>>
>> @@ -259,6 +283,28 @@ public class ParquetRecordReaderTest {
>> }
>>
>>
>> + //use this method to submit physical plan
>> + public void testParquetFullEngineLocalTextDistributed(String planName,
>> String filename, int numberOfTimesRead /* specified in json plan */, int
>> numberOfRowGroups, int recordsPerRowGroup) throws Exception{
>> +
>> + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
>> +
>> + checkValues = false;
>> +
>> + DrillConfig config = DrillConfig.create();
>> +
>> + try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient
>> client = new DrillClient(config, serviceSet.getCoordinator());){
>> + bit1.run();
>> + client.connect();
>> + RecordBatchLoader batchLoader = new
>> RecordBatchLoader(client.getAllocator());
>> + ParquetResultListener resultListener = new
>> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups,
>> numberOfTimesRead);
>> + Stopwatch watch = new Stopwatch().start();
>> + client.runQuery(UserProtos.QueryType.PHYSICAL,
>> Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8),
>> resultListener);
>> + resultListener.get();
>> + System.out.println(String.format("Took %d ms to run query",
>> watch.elapsed(TimeUnit.MILLISECONDS)));
>> +
>> + }
>> +
>> + }
>>
>> public String pad(String value, int length) {
>> return pad(value, length, " ");
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> index f508d09..5efecaf 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> +++
>> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> @@ -11,10 +11,7 @@
>> @id : 1,
>> entries : [
>> {
>> - path : "/tmp/testParquetFile_many_types_3"
>> - },
>> - {
>> - path : "/tmp/testParquetFile_many_types_3"
>> + path : "/tmp/parquet_test_file_many_types"
>> }
>> ],
>> storageengine:{
>>
>>
>
>
> --
> Regards,
> Tanujit
>
--
Regards,
Tanujit
Re: [5/8] git commit: DRILL-176: Updates to affinity calculator,
fixes for parquet serialization. Fix to ErrorHelper looping
Posted by Tanujit Ghosh <ta...@gmail.com>.
Hi,
when i try mvn install after these changes the
org.apache.drill.exec.store.parquet.ParquetRecordReaderTest is hanging.
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.596 sec -
in org.apache.drill.exec.expr.ExpressionTest
Running org.apache.drill.exec.store.TestAffinityCalculator
Took 0.616287 ms to build range map
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.137 sec -
in org.apache.drill.exec.store.TestAffinityCalculator
Running org.apache.drill.exec.store.parquet.ParquetRecordReaderTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
Environment is Fedora 18, open jdk 1.7
with skipTests everything is getting compiled fine.
Regards
Tanujit
On Fri, Aug 23, 2013 at 5:36 AM, <ja...@apache.org> wrote:
> DRILL-176: Updates to affinity calculator, fixes for parquet
> serialization. Fix to ErrorHelper looping
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/7edd3617
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/7edd3617
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/7edd3617
>
> Branch: refs/heads/master
> Commit: 7edd36170a9be291a69e44f6090474193485bf14
> Parents: d6ae53e
> Author: Steven Phillips <sp...@maprtech.com>
> Authored: Thu Aug 22 16:18:55 2013 -0700
> Committer: Jacques Nadeau <ja...@apache.org>
> Committed: Thu Aug 22 16:18:55 2013 -0700
>
> ----------------------------------------------------------------------
> .../drill/exec/planner/fragment/Wrapper.java | 5 +-
> .../drill/exec/store/AffinityCalculator.java | 91 ++++++----
> .../exec/store/parquet/ParquetGroupScan.java | 177 +++++++++----------
> .../exec/store/parquet/ParquetRecordReader.java | 2 +-
> .../store/parquet/ParquetScanBatchCreator.java | 10 +-
> .../drill/exec/work/foreman/ErrorHelper.java | 8 +-
> .../exec/store/TestParquetPhysicalPlan.java | 55 +++++-
> .../store/parquet/ParquetRecordReaderTest.java | 52 +++++-
> .../parquet_scan_union_screen_physical.json | 5 +-
> 9 files changed, 257 insertions(+), 148 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> index d5a24b0..8c4b0b4 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> @@ -151,15 +151,12 @@ public class Wrapper {
> for (int i = start; i < start + width; i++) {
> endpoints.add(all.get(i % div));
> }
> - } else if (values.size() < width) {
> - throw new NotImplementedException(
> - "Haven't implemented a scenario where we have some node
> affinity but the affinity list is smaller than the expected width.");
> } else {
> // get nodes with highest affinity.
> Collections.sort(values);
> values = Lists.reverse(values);
> for (int i = 0; i < width; i++) {
> - endpoints.add(values.get(i).getEndpoint());
> + endpoints.add(values.get(i%values.size()).getEndpoint());
> }
> }
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> index b4092cc..b341ea4 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> @@ -1,6 +1,7 @@
> package org.apache.drill.exec.store;
>
>
> +import com.google.common.base.Stopwatch;
> import com.google.common.collect.ImmutableRangeMap;
> import com.google.common.collect.Range;
> import org.apache.drill.exec.store.parquet.ParquetGroupScan;
> @@ -13,6 +14,7 @@ import org.apache.hadoop.fs.Path;
>
> import java.io.IOException;
> import java.util.*;
> +import java.util.concurrent.TimeUnit;
>
> public class AffinityCalculator {
> static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
> @@ -24,6 +26,7 @@ public class AffinityCalculator {
> String fileName;
> Collection<DrillbitEndpoint> endpoints;
> HashMap<String,DrillbitEndpoint> endPointMap;
> + Stopwatch watch = new Stopwatch();
>
> public AffinityCalculator(String fileName, FileSystem fs,
> Collection<DrillbitEndpoint> endpoints) {
> this.fs = fs;
> @@ -33,16 +36,20 @@ public class AffinityCalculator {
> buildEndpointMap();
> }
>
> + /**
> + * Builds a mapping of block locations to file byte range
> + */
> private void buildBlockMap() {
> try {
> + watch.start();
> FileStatus file = fs.getFileStatus(new Path(fileName));
> - long tC = System.nanoTime();
> blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
> - long tD = System.nanoTime();
> + watch.stop();
> logger.debug("Block locations: {}", blocks);
> - logger.debug("Took {} ms to get Block locations", (float)(tD - tC)
> / 1e6);
> + logger.debug("Took {} ms to get Block locations",
> watch.elapsed(TimeUnit.MILLISECONDS));
> } catch (IOException ioe) { throw new RuntimeException(ioe); }
> - long tA = System.nanoTime();
> + watch.reset();
> + watch.start();
> ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new
> ImmutableRangeMap.Builder<Long,BlockLocation>();
> for (BlockLocation block : blocks) {
> long start = block.getOffset();
> @@ -51,62 +58,72 @@ public class AffinityCalculator {
> blockMapBuilder = blockMapBuilder.put(range, block);
> }
> blockMap = blockMapBuilder.build();
> - long tB = System.nanoTime();
> - logger.debug("Took {} ms to build block map", (float)(tB - tA) / 1e6);
> + watch.stop();
> + logger.debug("Took {} ms to build block map",
> watch.elapsed(TimeUnit.MILLISECONDS));
> }
> /**
> + * For a given RowGroup, calculate how many bytes are available on each
> on drillbit endpoint
> *
> - * @param entry
> + * @param rowGroup the RowGroup to calculate endpoint bytes for
> */
> - public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) {
> - long tA = System.nanoTime();
> + public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) {
> + watch.reset();
> + watch.start();
> HashMap<String,Long> hostMap = new HashMap<>();
> - long start = entry.getStart();
> - long end = start + entry.getLength();
> - Range<Long> entryRange = Range.closedOpen(start, end);
> - ImmutableRangeMap<Long,BlockLocation> subRangeMap =
> blockMap.subRangeMap(entryRange);
> - for (Map.Entry<Range<Long>,BlockLocation> e :
> subRangeMap.asMapOfRanges().entrySet()) {
> - String[] hosts = null;
> - Range<Long> blockRange = e.getKey();
> + HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap();
> + long start = rowGroup.getStart();
> + long end = start + rowGroup.getLength();
> + Range<Long> rowGroupRange = Range.closedOpen(start, end);
> +
> + // Find submap of ranges that intersect with the rowGroup
> + ImmutableRangeMap<Long,BlockLocation> subRangeMap =
> blockMap.subRangeMap(rowGroupRange);
> +
> + // Iterate through each block in this submap and get the host for the
> block location
> + for (Map.Entry<Range<Long>,BlockLocation> block :
> subRangeMap.asMapOfRanges().entrySet()) {
> + String[] hosts;
> + Range<Long> blockRange = block.getKey();
> try {
> - hosts = e.getValue().getHosts();
> - } catch (IOException ioe) { /*TODO Handle this exception */}
> - Range<Long> intersection = entryRange.intersection(blockRange);
> + hosts = block.getValue().getHosts();
> + } catch (IOException ioe) {
> + throw new RuntimeException("Failed to get hosts for block
> location", ioe);
> + }
> + Range<Long> intersection = rowGroupRange.intersection(blockRange);
> long bytes = intersection.upperEndpoint() -
> intersection.lowerEndpoint();
> +
> + // For each host in the current block location, add the
> intersecting bytes to the corresponding endpoint
> for (String host : hosts) {
> - if (hostMap.containsKey(host)) {
> - hostMap.put(host, hostMap.get(host) + bytes);
> + DrillbitEndpoint endpoint = getDrillBitEndpoint(host);
> + if (endpointByteMap.containsKey(endpoint)) {
> + endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) +
> bytes);
> } else {
> - hostMap.put(host, bytes);
> + if (endpoint != null ) endpointByteMap.put(endpoint, bytes);
> }
> }
> }
> - HashMap<DrillbitEndpoint,Long> ebs = new HashMap();
> - try {
> - for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) {
> - String host = hostEntry.getKey();
> - Long bytes = hostEntry.getValue();
> - DrillbitEndpoint d = getDrillBitEndpoint(host);
> - if (d != null ) ebs.put(d, bytes);
> - }
> - } catch (NullPointerException n) {}
> - entry.setEndpointBytes(ebs);
> - long tB = System.nanoTime();
> - logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) /
> 1e6);
> +
> + rowGroup.setEndpointBytes(endpointByteMap);
> + rowGroup.setMaxBytes(endpointByteMap.size() > 0 ?
> Collections.max(endpointByteMap.values()) : 0);
> + logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(),
> rowGroup.getStart(), rowGroup.getMaxBytes());
> + watch.stop();
> + logger.debug("Took {} ms to set endpoint bytes",
> watch.elapsed(TimeUnit.MILLISECONDS));
> }
>
> private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
> return endPointMap.get(hostName);
> }
>
> + /**
> + * Builds a mapping of drillbit endpoints to hostnames
> + */
> private void buildEndpointMap() {
> - long tA = System.nanoTime();
> + watch.reset();
> + watch.start();
> endPointMap = new HashMap<String, DrillbitEndpoint>();
> for (DrillbitEndpoint d : endpoints) {
> String hostName = d.getAddress();
> endPointMap.put(hostName, d);
> }
> - long tB = System.nanoTime();
> - logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) /
> 1e6);
> + watch.stop();
> + logger.debug("Took {} ms to build endpoint map",
> watch.elapsed(TimeUnit.MILLISECONDS));
> }
> }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> index 9e48d33..64ced87 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> @@ -18,14 +18,13 @@
> package org.apache.drill.exec.store.parquet;
>
> import java.io.IOException;
> -import java.util.ArrayList;
> -import java.util.Collection;
> -import java.util.Collections;
> -import java.util.Comparator;
> -import java.util.HashMap;
> -import java.util.LinkedList;
> -import java.util.List;
> +import java.util.*;
> +import java.util.concurrent.TimeUnit;
>
> +import com.google.common.base.Stopwatch;
> +import com.google.common.collect.ArrayListMultimap;
> +import com.google.common.collect.Lists;
> +import com.google.common.collect.Multimap;
> import org.apache.drill.common.config.DrillConfig;
> import org.apache.drill.exec.exception.SetupException;
> import org.apache.drill.exec.physical.EndpointAffinity;
> @@ -59,8 +58,9 @@ import com.google.common.base.Preconditions;
> public class ParquetGroupScan extends AbstractGroupScan {
> static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
>
> - private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings;
> + private ArrayListMultimap<Integer,
> ParquetRowGroupScan.RowGroupReadEntry> mappings;
> private List<RowGroupInfo> rowGroupInfos;
> + private Stopwatch watch = new Stopwatch();
>
> public List<ReadEntryWithPath> getEntries() {
> return entries;
> @@ -110,16 +110,14 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
> }
>
> private void readFooter() throws IOException {
> - long tA = System.nanoTime();
> + watch.reset();
> + watch.start();
> rowGroupInfos = new ArrayList();
> long start = 0, length = 0;
> ColumnChunkMetaData columnChunkMetaData;
> for (ReadEntryWithPath readEntryWithPath : entries){
> Path path = new Path(readEntryWithPath.getPath());
> ParquetMetadata footer =
> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
> -// FileSystem fs =
> FileSystem.get(this.storageEngine.getHadoopConfig());
> -// FileStatus status = fs.getFileStatus(path);
> -// ParquetMetadata footer =
> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status);
> readEntryWithPath.getPath();
>
> int i = 0;
> @@ -138,38 +136,21 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
> i++;
> }
> }
> - long tB = System.nanoTime();
> - logger.debug("Took {} ms to get row group infos", (float)(tB - tA) /
> 1E6);
> + watch.stop();
> + logger.debug("Took {} ms to get row group infos",
> watch.elapsed(TimeUnit.MILLISECONDS));
> }
>
> private void calculateEndpointBytes() {
> - long tA = System.nanoTime();
> + watch.reset();
> + watch.start();
> AffinityCalculator ac = new AffinityCalculator(fileName, fs,
> availableEndpoints);
> for (RowGroupInfo e : rowGroupInfos) {
> ac.setEndpointBytes(e);
> totalBytes += e.getLength();
> }
> - long tB = System.nanoTime();
> - logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB -
> tA) / 1E6);
> + watch.stop();
> + logger.debug("Took {} ms to calculate EndpointBytes",
> watch.elapsed(TimeUnit.MILLISECONDS));
> }
> -/*
> - public LinkedList<RowGroupInfo> getRowGroups() {
> - return rowGroups;
> - }
> -
> - public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) {
> - this.rowGroups = rowGroups;
> - }
> -
> - public static class ParquetFileReadEntry {
> -
> - String path;
> -
> - public ParquetFileReadEntry(@JsonProperty String path){
> - this.path = path;
> - }
> - }
> - */
>
> @JsonIgnore
> public FileSystem getFileSystem() {
> @@ -232,16 +213,22 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
> }
> }
>
> + /**
> + *Calculates the affinity each endpoint has for this scan, by adding up
> the affinity each endpoint has for each
> + * rowGroup
> + * @return a list of EndpointAffinity objects
> + */
> @Override
> public List<EndpointAffinity> getOperatorAffinity() {
> - long tA = System.nanoTime();
> + watch.reset();
> + watch.start();
> if (this.endpointAffinities == null) {
> HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
> for (RowGroupInfo entry : rowGroupInfos) {
> for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
> long bytes = entry.getEndpointBytes().get(d);
> float affinity = (float)bytes / (float)totalBytes;
> - logger.error("RowGroup: {} Endpoint: {} Bytes: {}",
> entry.getRowGroupIndex(), d.getAddress(), bytes);
> + logger.debug("RowGroup: {} Endpoint: {} Bytes: {}",
> entry.getRowGroupIndex(), d.getAddress(), bytes);
> if (affinities.keySet().contains(d)) {
> affinities.put(d, affinities.get(d) + affinity);
> } else {
> @@ -256,83 +243,90 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
> }
> this.endpointAffinities = affinityList;
> }
> - long tB = System.nanoTime();
> - logger.debug("Took {} ms to get operator affinity", (float)(tB - tA)
> / 1E6);
> + watch.stop();
> + logger.debug("Took {} ms to get operator affinity",
> watch.elapsed(TimeUnit.MILLISECONDS));
> return this.endpointAffinities;
> }
>
>
> + static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01};
>
> -
> + /**
> + *
> + * @param incomingEndpoints
> + */
> @Override
> - public void applyAssignments(List<DrillbitEndpoint> endpoints) {
> - long tA = System.nanoTime();
> - Preconditions.checkArgument(endpoints.size() <= rowGroupInfos.size());
> -
> - int i = 0;
> - for (DrillbitEndpoint endpoint : endpoints) {
> - logger.debug("Endpoint index {}, endpoint host: {}", i++,
> endpoint.getAddress());
> + public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
> + watch.reset();
> + watch.start();
> + Preconditions.checkArgument(incomingEndpoints.size() <=
> rowGroupInfos.size());
> + mappings = ArrayListMultimap.create();
> + ArrayList rowGroupList = new ArrayList(rowGroupInfos);
> + List<DrillbitEndpoint> endpointLinkedlist =
> Lists.newLinkedList(incomingEndpoints);
> + for(double cutoff : ASSIGNMENT_CUTOFFS ){
> + scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff,
> false);
> }
> -
> - Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
> - mappings = new LinkedList[endpoints.size()];
> - LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints,
> rowGroupInfos, 100, true, false);
> - LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints,
> unassigned, 50, true, false);
> - LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints,
> unassigned2, 25, true, false);
> - LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints,
> unassigned3, 0, false, false);
> - LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints,
> unassigned4, 0, false, true);
> - assert unassigned5.size() == 0 : String.format("All readEntries
> should be assigned by now, but some are still unassigned");
> - long tB = System.nanoTime();
> - logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) /
> 1E6);
> + scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true);
> + watch.stop();
> + logger.debug("Took {} ms to apply assignments",
> watch.elapsed(TimeUnit.MILLISECONDS));
> + Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries
> should be assigned by now, but some are still unassigned");
> + Preconditions.checkArgument(!rowGroupInfos.isEmpty());
> }
>
> - private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint>
> endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean
> mustContain, boolean assignAll) {
> - Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
> - LinkedList<RowGroupInfo> unassigned = new LinkedList<>();
> -
> - int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() *
> 1.5);
> + public int fragmentPointer = 0;
> +
> + /**
> + *
> + * @param endpointAssignments the mapping between fragment/endpoint and
> rowGroup
> + * @param endpoints the list of drillbits, ordered by the corresponding
> fragment
> + * @param rowGroups the list of rowGroups to assign
> + * @param requiredPercentage the percentage of max bytes required to
> make an assignment
> + * @param assignAll if true, will assign even if no affinity
> + */
> + private void scanAndAssign (Multimap<Integer,
> ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments,
> List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double
> requiredPercentage, boolean assignAll) {
> + Collections.sort(rowGroups, new ParquetReadEntryComparator());
> + final boolean requireAffinity = requiredPercentage > 0;
> + int maxAssignments = (int) (rowGroups.size() / endpoints.size());
> +
> + if (maxAssignments < 1) maxAssignments = 1;
> +
> + for(Iterator<RowGroupInfo> iter = rowGroups.iterator();
> iter.hasNext();){
> + RowGroupInfo rowGroupInfo = iter.next();
> + for (int i = 0; i < endpoints.size(); i++) {
> + int minorFragmentId = (fragmentPointer + i) % endpoints.size();
> + DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
> + Map<DrillbitEndpoint, Long> bytesPerEndpoint =
> rowGroupInfo.getEndpointBytes();
> + boolean haveAffinity =
> bytesPerEndpoint.containsKey(currentEndpoint) ;
>
> - if (maxEntries < 1) maxEntries = 1;
> -
> - int i =0;
> - for(RowGroupInfo e : rowGroups) {
> - boolean assigned = false;
> - for (int j = i; j < i + endpoints.size(); j++) {
> - DrillbitEndpoint currentEndpoint =
> endpoints.get(j%endpoints.size());
> if (assignAll ||
> - (e.getEndpointBytes().size() > 0 &&
> - (e.getEndpointBytes().containsKey(currentEndpoint) ||
> !mustContain) &&
> - (mappings[j%endpoints.size()] == null ||
> mappings[j%endpoints.size()].size() < maxEntries) &&
> - e.getEndpointBytes().get(currentEndpoint) >=
> e.getMaxBytes() * requiredPercentage / 100)) {
> - LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries =
> mappings[j%endpoints.size()];
> - if(entries == null){
> - entries = new
> LinkedList<ParquetRowGroupScan.RowGroupReadEntry>();
> - mappings[j%endpoints.size()] = entries;
> - }
> - entries.add(e.getRowGroupReadEntry());
> - logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}",
> e.getPath(), e.getStart(), currentEndpoint.getAddress());
> - assigned = true;
> + (!bytesPerEndpoint.isEmpty() &&
> + (!requireAffinity || haveAffinity) &&
> +
> (!endpointAssignments.containsKey(minorFragmentId) ||
> endpointAssignments.get(minorFragmentId).size() < maxAssignments) &&
> + bytesPerEndpoint.get(currentEndpoint) >=
> rowGroupInfo.getMaxBytes() * requiredPercentage)) {
> +
> + endpointAssignments.put(minorFragmentId,
> rowGroupInfo.getRowGroupReadEntry());
> + logger.debug("Assigned rowGroup {} to minorFragmentId {}
> endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId,
> endpoints.get(minorFragmentId).getAddress());
> + iter.remove();
> + fragmentPointer = (minorFragmentId + 1) % endpoints.size();
> break;
> }
> }
> - if (!assigned) unassigned.add(e);
> - i++;
> +
> }
> - return unassigned;
> }
>
> @Override
> public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
> - assert minorFragmentId < mappings.length : String.format("Mappings
> length [%d] should be longer than minor fragment id [%d] but it isn't.",
> mappings.length, minorFragmentId);
> - for (ParquetRowGroupScan.RowGroupReadEntry rg :
> mappings[minorFragmentId]) {
> + assert minorFragmentId < mappings.size() : String.format("Mappings
> length [%d] should be longer than minor fragment id [%d] but it isn't.",
> mappings.size(), minorFragmentId);
> + for (ParquetRowGroupScan.RowGroupReadEntry rg :
> mappings.get(minorFragmentId)) {
> logger.debug("minorFragmentId: {} Path: {} RowGroupIndex:
> {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
> }
> + Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(),
> String.format("MinorFragmentId %d has no read entries assigned",
> minorFragmentId));
> try {
> - return new ParquetRowGroupScan(storageEngine, engineConfig,
> mappings[minorFragmentId]);
> + return new ParquetRowGroupScan(storageEngine, engineConfig,
> mappings.get(minorFragmentId));
> } catch (SetupException e) {
> - e.printStackTrace(); // TODO - fix this
> + throw new RuntimeException("Error setting up ParquetRowGroupScan",
> e);
> }
> - return null;
> }
>
> @Override
> @@ -342,7 +336,8 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
>
> @Override
> public OperatorCost getCost() {
> - return new OperatorCost(1,1,1,1);
> + //TODO Figure out how to properly calculate cost
> + return new OperatorCost(1,rowGroupInfos.size(),1,1);
> }
>
> @Override
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> index 4e46034..3aaa987 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> @@ -211,8 +211,8 @@ public class ParquetRecordReader implements
> RecordReader {
> }
> for (VarLenBinaryReader.VarLengthColumn r :
> varLengthReader.columns) {
> output.addField(r.valueVecHolder.getValueVector());
> - output.setNewSchema();
> }
> + output.setNewSchema();
> }catch(SchemaChangeException e) {
> throw new ExecutionSetupException("Error setting up output
> mutator.", e);
> }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> index 03fb4ec..addd288 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> @@ -21,7 +21,9 @@ import java.io.IOException;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> import java.util.List;
> +import java.util.concurrent.TimeUnit;
>
> +import com.google.common.base.Stopwatch;
> import org.apache.drill.common.exceptions.ExecutionSetupException;
> import org.apache.drill.exec.ops.FragmentContext;
> import org.apache.drill.exec.physical.impl.BatchCreator;
> @@ -40,12 +42,12 @@ import parquet.hadoop.ParquetFileReader;
> import parquet.hadoop.metadata.ParquetMetadata;
>
> public class ParquetScanBatchCreator implements
> BatchCreator<ParquetRowGroupScan>{
> - static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
> + static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
>
> @Override
> public RecordBatch getBatch(FragmentContext context,
> ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws
> ExecutionSetupException {
> - long tA = System.nanoTime(), tB;
> - System.out.println( new SimpleDateFormat("mm:ss S").format(new
> Date()) + " :Start of ScanBatCreator.scanBatch");
> + Stopwatch watch = new Stopwatch();
> + watch.start();
> Preconditions.checkArgument(children.isEmpty());
> List<RecordReader> readers = Lists.newArrayList();
> for(ParquetRowGroupScan.RowGroupReadEntry e :
> rowGroupScan.getRowGroupReadEntries()){
> @@ -68,7 +70,7 @@ public class ParquetScanBatchCreator implements
> BatchCreator<ParquetRowGroupScan
> throw new ExecutionSetupException(e1);
> }
> }
> - System.out.println( "Total time in method: " + ((float)
> (System.nanoTime() - tA) / 1e9));
> + logger.debug("total time in ScanBatchCreator.getBatch: {} ms",
> watch.elapsed(TimeUnit.MILLISECONDS));
> return new ScanBatch(context, readers.iterator());
> }
> }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> index 9a33109..72c5f34 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> @@ -35,8 +35,8 @@ public class ErrorHelper {
> if(message != null){
> sb.append(message);
> }
> -
> - do{
> +
> + while (true) {
> sb.append(" < ");
> sb.append(t.getClass().getSimpleName());
> if(t.getMessage() != null){
> @@ -44,7 +44,9 @@ public class ErrorHelper {
> sb.append(t.getMessage());
> sb.append(" ]");
> }
> - }while(t.getCause() != null && t.getCause() != t);
> + if (t.getCause() == null || t.getCause() == t) break;
> + t = t.getCause();
> + }
>
> builder.setMessage(sb.toString());
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> index e2a00f1..18ac294 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> @@ -13,10 +13,16 @@ import org.apache.drill.exec.physical.PhysicalPlan;
> import org.apache.drill.exec.planner.PhysicalPlanReader;
> import org.apache.drill.exec.proto.CoordinationProtos;
> import org.apache.drill.exec.proto.UserProtos;
> +import org.apache.drill.exec.record.RecordBatchLoader;
> +import org.apache.drill.exec.record.VectorWrapper;
> +import org.apache.drill.exec.rpc.RpcException;
> import org.apache.drill.exec.rpc.user.QueryResultBatch;
> +import org.apache.drill.exec.rpc.user.UserResultsListener;
> +import org.apache.drill.exec.server.BootStrapContext;
> import org.apache.drill.exec.server.Drillbit;
> import org.apache.drill.exec.server.RemoteServiceSet;
> import org.apache.drill.exec.store.parquet.ParquetGroupScan;
> +import org.apache.drill.exec.vector.ValueVector;
> import org.apache.hadoop.fs.BlockLocation;
> import org.apache.hadoop.fs.FileStatus;
> import org.apache.hadoop.fs.FileSystem;
> @@ -29,6 +35,7 @@ import java.io.IOException;
> import java.nio.charset.Charset;
> import java.util.LinkedList;
> import java.util.List;
> +import java.util.concurrent.CountDownLatch;
>
> import static junit.framework.Assert.assertNull;
> import static org.junit.Assert.assertEquals;
> @@ -38,6 +45,8 @@ public class TestParquetPhysicalPlan {
>
> //public String fileName = "/physical_test2.json";
> public String fileName = "parquet_scan_union_screen_physical.json";
> +// public String fileName = "parquet-sample.json";
> +
>
> @Test
> @Ignore
> @@ -49,7 +58,51 @@ public class TestParquetPhysicalPlan {
> bit1.run();
> client.connect();
> List<QueryResultBatch> results =
> client.runQuery(UserProtos.QueryType.PHYSICAL,
> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
> - System.out.println(String.format("Got %d results", results.size()));
> + RecordBatchLoader loader = new
> RecordBatchLoader(bit1.getContext().getAllocator());
> + for (QueryResultBatch b : results) {
> + System.out.println(String.format("Got %d results",
> b.getHeader().getRowCount()));
> + loader.load(b.getHeader().getDef(), b.getData());
> + for (VectorWrapper vw : loader) {
> + System.out.println(vw.getValueVector().getField().getName());
> + ValueVector vv = vw.getValueVector();
> + for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
> + Object o = vv.getAccessor().getObject(i);
> + System.out.println(vv.getAccessor().getObject(i));
> + }
> + }
> + }
> + client.close();
> + }
> + }
> +
> + private class ParquetResultsListener implements UserResultsListener {
> + private CountDownLatch latch = new CountDownLatch(1);
> + @Override
> + public void submissionFailed(RpcException ex) {
> + logger.error("submission failed", ex);
> + latch.countDown();
> + }
> +
> + @Override
> + public void resultArrived(QueryResultBatch result) {
> + System.out.printf("Result batch arrived. Number of records: %d",
> result.getHeader().getRowCount());
> + if (result.getHeader().getIsLastChunk()) latch.countDown();
> + }
> +
> + public void await() throws Exception {
> + latch.await();
> + }
> + }
> + @Test
> + @Ignore
> + public void testParseParquetPhysicalPlanRemote() throws Exception {
> + DrillConfig config = DrillConfig.create();
> +
> + try(DrillClient client = new DrillClient(config);){
> + client.connect();
> + ParquetResultsListener listener = new ParquetResultsListener();
> + client.runQuery(UserProtos.QueryType.PHYSICAL,
> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8),
> listener);
> + listener.await();
> client.close();
> }
> }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> index 1d91455..7a99c3f 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> @@ -48,6 +48,7 @@ import
> org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo;
> import org.apache.drill.exec.vector.BaseDataValueVector;
> import org.apache.drill.exec.vector.ValueVector;
> import org.junit.BeforeClass;
> +import org.junit.Ignore;
> import org.junit.Test;
>
> import parquet.bytes.BytesInput;
> @@ -68,6 +69,7 @@ public class ParquetRecordReaderTest {
> static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
>
> private boolean VERBOSE_DEBUG = false;
> + private boolean checkValues = true;
>
> static final int numberRowGroups = 20;
> static final int recordsPerRowGroup = 300000;
> @@ -100,6 +102,20 @@ public class ParquetRecordReaderTest {
> testParquetFullEngineLocalText(planText, fileName, i,
> numberRowGroups, recordsPerRowGroup);
> }
>
> + @Test
> + @Ignore
> + public void testLocalDistributed() throws Exception {
> + String planName = "/parquet/parquet_scan_union_screen_physical.json";
> + testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20,
> 300000);
> + }
> +
> + @Test
> + @Ignore
> + public void testRemoteDistributed() throws Exception {
> + String planName = "/parquet/parquet_scan_union_screen_physical.json";
> + testParquetFullEngineRemote(planName, fileName, 1, 10, 30000);
> + }
> +
>
> private class ParquetResultListener implements UserResultsListener {
> private SettableFuture<Void> future = SettableFuture.create();
> @@ -155,8 +171,12 @@ public class ParquetRecordReaderTest {
> if (VERBOSE_DEBUG){
> System.out.print(vv.getAccessor().getObject(j) + ", " + (j %
> 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
> }
> - assertField(vv, j, (TypeProtos.MinorType) currentField.type,
> - currentField.values[(int) (columnValCounter % 3)], (String)
> currentField.name + "/");
> + if (checkValues) {
> + try {
> + assertField(vv, j, (TypeProtos.MinorType) currentField.type,
> + currentField.values[(int) (columnValCounter % 3)],
> (String) currentField.name + "/");
> + } catch (AssertionError e) { submissionFailed(new
> RpcException(e)); }
> + }
> columnValCounter++;
> }
> if (VERBOSE_DEBUG){
> @@ -197,7 +217,9 @@ public class ParquetRecordReaderTest {
> batchCounter++;
> if(result.getHeader().getIsLastChunk()){
> for (String s : valuesChecked.keySet()) {
> + try {
> assertEquals("Record count incorrect for column: " + s,
> totalRecords, (long) valuesChecked.get(s));
> + } catch (AssertionError e) { submissionFailed(new
> RpcException(e)); }
> }
>
> assert valuesChecked.keySet().size() > 0;
> @@ -222,11 +244,13 @@ public class ParquetRecordReaderTest {
>
> DrillConfig config = DrillConfig.create();
>
> + checkValues = false;
> +
> try(DrillClient client = new DrillClient(config);){
> client.connect();
> RecordBatchLoader batchLoader = new
> RecordBatchLoader(client.getAllocator());
> ParquetResultListener resultListener = new
> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups,
> numberOfTimesRead);
> - client.runQuery(UserProtos.QueryType.LOGICAL,
> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8),
> resultListener);
> + client.runQuery(UserProtos.QueryType.PHYSICAL,
> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8),
> resultListener);
> resultListener.get();
> }
>
> @@ -259,6 +283,28 @@ public class ParquetRecordReaderTest {
> }
>
>
> + //use this method to submit physical plan
> + public void testParquetFullEngineLocalTextDistributed(String planName,
> String filename, int numberOfTimesRead /* specified in json plan */, int
> numberOfRowGroups, int recordsPerRowGroup) throws Exception{
> +
> + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
> +
> + checkValues = false;
> +
> + DrillConfig config = DrillConfig.create();
> +
> + try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient
> client = new DrillClient(config, serviceSet.getCoordinator());){
> + bit1.run();
> + client.connect();
> + RecordBatchLoader batchLoader = new
> RecordBatchLoader(client.getAllocator());
> + ParquetResultListener resultListener = new
> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups,
> numberOfTimesRead);
> + Stopwatch watch = new Stopwatch().start();
> + client.runQuery(UserProtos.QueryType.PHYSICAL,
> Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8),
> resultListener);
> + resultListener.get();
> + System.out.println(String.format("Took %d ms to run query",
> watch.elapsed(TimeUnit.MILLISECONDS)));
> +
> + }
> +
> + }
>
> public String pad(String value, int length) {
> return pad(value, length, " ");
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> index f508d09..5efecaf 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> +++
> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> @@ -11,10 +11,7 @@
> @id : 1,
> entries : [
> {
> - path : "/tmp/testParquetFile_many_types_3"
> - },
> - {
> - path : "/tmp/testParquetFile_many_types_3"
> + path : "/tmp/parquet_test_file_many_types"
> }
> ],
> storageengine:{
>
>
--
Regards,
Tanujit