You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/05/15 02:20:15 UTC
[1/2] drill git commit: DRILL-3088: Kill and cleanup remaing batches
in left upstream in NestedLoopJoin
Repository: drill
Updated Branches:
refs/heads/master f7f6efc52 -> 16ef62851
DRILL-3088: Kill and cleanup remaing batches in left upstream in NestedLoopJoin
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/80e3f74f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/80e3f74f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/80e3f74f
Branch: refs/heads/master
Commit: 80e3f74f3951d84c3209e6910440f2b0378cbd6f
Parents: f7f6efc
Author: Steven Phillips <sm...@apache.org>
Authored: Thu May 14 15:44:52 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Thu May 14 15:48:17 2015 -0700
----------------------------------------------------------------------
.../physical/impl/join/NestedLoopJoinBatch.java | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/80e3f74f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index de0d8e5..9bcea60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.server.options.DrillConfigIterator.Iter;
import org.apache.drill.exec.vector.AllocationHelper;
import com.google.common.base.Preconditions;
@@ -143,6 +144,8 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
// exit if we have an empty left batch
if (leftUpstream == IterOutcome.NONE) {
+ // inform upstream that we don't need anymore data and make sure we clean up any batches already in queue
+ killAndDrainRight();
return IterOutcome.NONE;
}
@@ -193,6 +196,23 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
return (outputRecords > 0) ? IterOutcome.OK : IterOutcome.NONE;
}
+ private void killAndDrainRight() {
+ if (!hasMore(rightUpstream)) {
+ return;
+ }
+ right.kill(true);
+ while (hasMore(rightUpstream)) {
+ for (VectorWrapper<?> wrapper : right) {
+ wrapper.getValueVector().clear();
+ }
+ rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
+ }
+ }
+
+ private boolean hasMore(IterOutcome outcome) {
+ return outcome == IterOutcome.OK || outcome == IterOutcome.OK_NEW_SCHEMA;
+ }
+
/**
* Method generates the runtime code needed for NLJ. Other than the setup method to set the input and output value
* vector references we implement two more methods
[2/2] drill git commit: DRILL-2875: Show record number relative to
beginning of file in JsonRecordReader
Posted by sm...@apache.org.
DRILL-2875: Show record number relative to beginning of file in JsonRecordReader
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/16ef6285
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/16ef6285
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/16ef6285
Branch: refs/heads/master
Commit: 16ef62851403cf5eb9984671b332d93912c61f0c
Parents: 80e3f74
Author: Steven Phillips <sm...@apache.org>
Authored: Wed May 13 20:37:00 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Thu May 14 17:16:10 2015 -0700
----------------------------------------------------------------------
.../drill/exec/store/easy/json/JSONRecordReader.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/16ef6285/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 3d789eb..0df6227 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -55,6 +55,7 @@ public class JSONRecordReader extends AbstractRecordReader {
private final DrillFileSystem fileSystem;
private JsonProcessor jsonReader;
private int recordCount;
+ private long runningRecordCount = 0;
private final FragmentContext fragmentContext;
private OperatorContext operatorContext;
private final boolean enableAllTextMode;
@@ -154,12 +155,16 @@ public class JSONRecordReader extends AbstractRecordReader {
if (columnNr > 0) {
exceptionBuilder.pushContext("Column ", columnNr);
}
- exceptionBuilder.pushContext("Record ", recordCount + 1)
+ exceptionBuilder.pushContext("Record ", currentRecordNumberInFile())
.pushContext("File ", hadoopPath.toUri().getPath());
throw exceptionBuilder.build();
}
+ private long currentRecordNumberInFile() {
+ return runningRecordCount + recordCount + 1;
+ }
+
@Override
public int next() {
writer.allocate();
@@ -189,6 +194,7 @@ public class JSONRecordReader extends AbstractRecordReader {
// p.stop();
// System.out.println(String.format("Wrote %d records in %dms.", recordCount, p.elapsed(TimeUnit.MILLISECONDS)));
+ updateRunningCount();
return recordCount;
@@ -199,6 +205,10 @@ public class JSONRecordReader extends AbstractRecordReader {
return 0;
}
+ private void updateRunningCount() {
+ runningRecordCount += recordCount;
+ }
+
@Override
public void cleanup() {
try {