You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2018/08/14 16:44:15 UTC
asterixdb git commit: [NO ISSUE][RT] Follow IFrameWriter protocol in
AbstractOneInputPushRuntime
Repository: asterixdb
Updated Branches:
refs/heads/master 9c7088ddb -> b2a7381a1
[NO ISSUE][RT] Follow IFrameWriter protocol in AbstractOneInputPushRuntime
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Many implementations of AbstractOneInputPushRuntime didn't
follow the IFrameWriter protocol causing many unexpected
runtime exceptions.
- This change ensures that all of the subclasses implement the
protocol correctly.
Change-Id: I5133007f298366f58b53acc9f48bc553724dd7b5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2884
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b2a7381a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b2a7381a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b2a7381a
Branch: refs/heads/master
Commit: b2a7381a1dd644141affde764bc31beaf7bb482d
Parents: 9c7088d
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Sun Aug 12 21:42:26 2018 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Tue Aug 14 09:43:58 2018 -0700
----------------------------------------------------------------------
.../management/runtime/CommitRuntime.java | 27 +-------------------
.../aggreg/AggregateRuntimeFactory.java | 12 +--------
...actOneInputOneOutputOneFramePushRuntime.java | 22 +++++++---------
.../base/AbstractOneInputPushRuntime.java | 19 +++++++++++++-
.../base/AbstractOneInputSourcePushRuntime.java | 10 --------
.../operators/meta/SubplanRuntimeFactory.java | 3 ++-
.../sort/InMemorySortRuntimeFactory.java | 24 ++++++++++++-----
.../operators/std/AssignRuntimeFactory.java | 18 +------------
.../std/EmptyTupleSourceRuntimeFactory.java | 11 +++-----
.../std/NestedTupleSourceRuntimeFactory.java | 5 ----
.../std/RunningAggregateRuntimeFactory.java | 18 +------------
.../std/StreamLimitRuntimeFactory.java | 2 +-
.../std/StreamProjectRuntimeFactory.java | 2 +-
.../std/StreamSelectRuntimeFactory.java | 12 +--------
.../std/StringStreamingRuntimeFactory.java | 5 +++-
.../operators/std/UnnestRuntimeFactory.java | 2 +-
16 files changed, 63 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 74ba139..2692cc7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -89,7 +89,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
return;
}
initAccessAppend(ctx);
- writer.open();
+ super.open();
} catch (ACIDException e) {
throw HyracksDataException.create(e);
}
@@ -142,31 +142,6 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
}
@Override
- public void fail() throws HyracksDataException {
- failed = true;
- if (isSink) {
- return;
- }
- writer.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (isSink) {
- return;
- }
- try {
- flushIfNotFailed();
- } catch (Exception e) {
- writer.fail();
- throw e;
- } finally {
- writer.close();
- }
- appender.reset(frame, true);
- }
-
- @Override
public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
this.inputRecordDesc = recordDescriptor;
this.tAccess = new FrameTupleAccessor(inputRecordDesc);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index e99b61b..1f9cb91 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -67,7 +67,6 @@ public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFac
private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(aggregs.length);
private boolean first = true;
- private boolean isOpen = false;
@Override
public void open() throws HyracksDataException {
@@ -81,8 +80,7 @@ public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFac
for (int i = 0; i < aggregFactories.length; i++) {
aggregs[i].init();
}
- isOpen = true;
- writer.open();
+ super.open();
}
@Override
@@ -121,14 +119,6 @@ public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFac
aggregs[f].step(tupleRef);
}
}
-
- @Override
- public void fail() throws HyracksDataException {
- failed = true;
- if (isOpen) {
- writer.fail();
- }
- }
};
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index a7468a7..71b44d3 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.comm.IFrameTupleAppender;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -51,25 +52,20 @@ public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends Abstr
@Override
public void close() throws HyracksDataException {
- HyracksDataException closeException = null;
+ if (!isOpen) {
+ return;
+ }
+ Throwable closeException = null;
try {
flushIfNotFailed();
} catch (Exception e) {
- closeException = HyracksDataException.create(e);
- writer.fail();
+ closeException = e;
+ fail(closeException);
} finally {
- try {
- writer.close();
- } catch (Exception e) {
- if (closeException == null) {
- closeException = HyracksDataException.create(e);
- } else {
- closeException.addSuppressed(e);
- }
- }
+ closeException = CleanupUtils.close(writer, closeException);
}
if (closeException != null) {
- throw closeException;
+ throw HyracksDataException.create(closeException);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
index 5cced8d..c7d2d94 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -27,6 +27,7 @@ public abstract class AbstractOneInputPushRuntime implements IPushRuntime {
protected IFrameWriter writer;
protected RecordDescriptor outputRecordDesc;
protected boolean failed;
+ protected boolean isOpen;
@Override
public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
@@ -35,8 +36,24 @@ public abstract class AbstractOneInputPushRuntime implements IPushRuntime {
}
@Override
+ public void open() throws HyracksDataException {
+ isOpen = true;
+ writer.open();
+ }
+
+ @Override
public void fail() throws HyracksDataException {
failed = true;
- writer.fail();
+ if (isOpen) {
+ writer.fail();
+ }
+ }
+
+ protected void fail(Throwable failure) {
+ try {
+ fail();
+ } catch (Throwable th) {
+ failure.addSuppressed(th);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
index 35563e0..cccfd62 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -32,16 +32,6 @@ public abstract class AbstractOneInputSourcePushRuntime extends AbstractOneInput
}
@Override
- public void close() throws HyracksDataException {
- // close is a no op since this operator completes operating in open()
- }
-
- @Override
- public void fail() throws HyracksDataException {
- // fail is a no op since if a failure happened, the operator would've already called fail() on downstream
- }
-
- @Override
public void flush() throws HyracksDataException {
// flush will never be called on this runtime
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 159fde7..3cee12d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -151,7 +151,8 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto
@Override
public void open() throws HyracksDataException {
- writer.open();
+ // writer opened many times?
+ super.open();
if (first) {
first = false;
initAccessAppendRef(ctx);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index f251bb7..2453029 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -27,6 +27,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
import org.apache.hyracks.dataflow.std.buffermanager.FrameFreeSlotPolicyFactory;
import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
@@ -69,7 +70,7 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime
@Override
public void open() throws HyracksDataException {
- writer.open();
+ super.open();
if (frameSorter == null) {
IFrameBufferManager manager = new VariableFrameMemoryManager(
new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
@@ -87,11 +88,22 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime
@Override
public void close() throws HyracksDataException {
- try {
- frameSorter.sort();
- frameSorter.flush(writer);
- } finally {
- writer.close();
+ Throwable failure = null;
+ if (isOpen) {
+ try {
+ if (!failed) {
+ frameSorter.sort();
+ frameSorter.flush(writer);
+ }
+ } catch (Throwable th) {
+ failure = th;
+ fail(th);
+ } finally {
+ failure = CleanupUtils.close(writer, failure);
+ }
+ }
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
}
}
};
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index b1b652f..5b36c5f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -96,7 +96,6 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
private IScalarEvaluator[] eval = new IScalarEvaluator[evalFactories.length];
private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
private boolean first = true;
- private boolean isOpen = false;
private int tupleIndex = 0;
@Override
@@ -109,15 +108,7 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
eval[i] = evalFactories[i].createScalarEvaluator(ctx);
}
}
- isOpen = true;
- writer.open();
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (isOpen) {
- super.close();
- }
+ super.open();
}
@Override
@@ -177,13 +168,6 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
}
@Override
- public void fail() throws HyracksDataException {
- if (isOpen) {
- super.fail();
- }
- }
-
- @Override
public void flush() throws HyracksDataException {
appender.flush(writer);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 7bd924d..9ca3cd6 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -48,7 +48,7 @@ public class EmptyTupleSourceRuntimeFactory extends AbstractPushRuntimeFactory {
@Override
public void open() throws HyracksDataException {
- writer.open();
+ super.open();
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
throw new IllegalStateException();
}
@@ -56,13 +56,10 @@ public class EmptyTupleSourceRuntimeFactory extends AbstractPushRuntimeFactory {
}
@Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
public void close() throws HyracksDataException {
- writer.close();
+ if (isOpen) {
+ writer.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index f94672d..832cb22 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -49,11 +49,6 @@ public class NestedTupleSourceRuntimeFactory extends AbstractPushRuntimeFactory
initAccessAppend(ctx);
}
- @Override
- public void open() throws HyracksDataException {
- writer.open();
- }
-
public void writeTuple(ByteBuffer inputBuffer, int tIndex) throws HyracksDataException {
tAccess.reset(inputBuffer);
appendTupleToFrame(tIndex);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 33b7725..ca58d4d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -89,7 +89,6 @@ public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRun
private final IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
private boolean first = true;
- private boolean isOpen = false;
@Override
public void open() throws HyracksDataException {
@@ -104,22 +103,7 @@ public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRun
for (int i = 0; i < runningAggregates.length; i++) {
raggs[i].init();
}
- isOpen = true;
- writer.open();
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (isOpen) {
- super.close();
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- if (isOpen) {
- writer.fail();
- }
+ super.open();
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index 59df402..aca5bf1 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -72,7 +72,7 @@ public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeF
@Override
public void open() throws HyracksDataException {
- writer.open();
+ super.open();
if (evalMaxObjects == null) {
initAccessAppendRef(ctx);
evalMaxObjects = maxObjectsEvalFactory.createScalarEvaluator(ctx);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index a8ca082..713a99c 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -53,7 +53,7 @@ public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntim
@Override
public void open() throws HyracksDataException {
- writer.open();
+ super.open();
if (first) {
first = false;
initAccessAppend(ctx);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 171544d..933e640 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -92,8 +92,7 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
initAccessAppendFieldRef(ctx);
eval = cond.createScalarEvaluator(ctx);
}
- writer.open();
-
+ super.open();
//prepare nullTupleBuilder
if (retainMissing && missingWriter == null) {
missingWriter = missingWriterFactory.createMissingWriter();
@@ -105,15 +104,6 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
}
@Override
- public void close() throws HyracksDataException {
- try {
- flushIfNotFailed();
- } finally {
- writer.close();
- }
- }
-
- @Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
index 53974b2..7e5c346 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -43,6 +43,9 @@ public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRunt
private char fieldDelimiter;
private ITupleParserFactory parserFactory;
+ /*
+ * NOTE: This operator doesn't follow the IFrameWriter protocol
+ */
public StringStreamingRuntimeFactory(String command, IPrinterFactory[] printerFactories, char fieldDelimiter,
ITupleParserFactory parserFactory) {
super(null);
@@ -129,7 +132,6 @@ public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRunt
first = false;
initAccessAppendRef(ctx);
}
-
try {
ITupleParser parser = parserFactory.createTupleParser(ctx);
process = Runtime.getRuntime().exec(command);
@@ -141,6 +143,7 @@ public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRunt
new DumpInStreamToPrintStream(process.getErrorStream(), System.err);
dumpStderr = new Thread(disps);
dumpStderr.start();
+ super.open();
} catch (IOException e) {
throw HyracksDataException.create(e);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 914f4a0..22189ac 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -94,7 +94,7 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
@Override
public void open() throws HyracksDataException {
- writer.open();
+ super.open();
if (tRef == null) {
initAccessAppendRef(ctx);
}