You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2018/08/14 16:44:15 UTC

asterixdb git commit: [NO ISSUE][RT] Follow IFrameWriter protocol in AbstractOneInputPushRuntime

Repository: asterixdb
Updated Branches:
  refs/heads/master 9c7088ddb -> b2a7381a1


[NO ISSUE][RT] Follow IFrameWriter protocol in AbstractOneInputPushRuntime

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Many implementations of AbstractOneInputPushRuntime didn't
  follow the IFrameWriter protocol causing many unexpected
  runtime exceptions.
- This change ensures that all of the subclasses implement the
  protocol correctly.

Change-Id: I5133007f298366f58b53acc9f48bc553724dd7b5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2884
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b2a7381a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b2a7381a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b2a7381a

Branch: refs/heads/master
Commit: b2a7381a1dd644141affde764bc31beaf7bb482d
Parents: 9c7088d
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Sun Aug 12 21:42:26 2018 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Tue Aug 14 09:43:58 2018 -0700

----------------------------------------------------------------------
 .../management/runtime/CommitRuntime.java       | 27 +-------------------
 .../aggreg/AggregateRuntimeFactory.java         | 12 +--------
 ...actOneInputOneOutputOneFramePushRuntime.java | 22 +++++++---------
 .../base/AbstractOneInputPushRuntime.java       | 19 +++++++++++++-
 .../base/AbstractOneInputSourcePushRuntime.java | 10 --------
 .../operators/meta/SubplanRuntimeFactory.java   |  3 ++-
 .../sort/InMemorySortRuntimeFactory.java        | 24 ++++++++++++-----
 .../operators/std/AssignRuntimeFactory.java     | 18 +------------
 .../std/EmptyTupleSourceRuntimeFactory.java     | 11 +++-----
 .../std/NestedTupleSourceRuntimeFactory.java    |  5 ----
 .../std/RunningAggregateRuntimeFactory.java     | 18 +------------
 .../std/StreamLimitRuntimeFactory.java          |  2 +-
 .../std/StreamProjectRuntimeFactory.java        |  2 +-
 .../std/StreamSelectRuntimeFactory.java         | 12 +--------
 .../std/StringStreamingRuntimeFactory.java      |  5 +++-
 .../operators/std/UnnestRuntimeFactory.java     |  2 +-
 16 files changed, 63 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 74ba139..2692cc7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -89,7 +89,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
                 return;
             }
             initAccessAppend(ctx);
-            writer.open();
+            super.open();
         } catch (ACIDException e) {
             throw HyracksDataException.create(e);
         }
@@ -142,31 +142,6 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
     }
 
     @Override
-    public void fail() throws HyracksDataException {
-        failed = true;
-        if (isSink) {
-            return;
-        }
-        writer.fail();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (isSink) {
-            return;
-        }
-        try {
-            flushIfNotFailed();
-        } catch (Exception e) {
-            writer.fail();
-            throw e;
-        } finally {
-            writer.close();
-        }
-        appender.reset(frame, true);
-    }
-
-    @Override
     public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
         this.inputRecordDesc = recordDescriptor;
         this.tAccess = new FrameTupleAccessor(inputRecordDesc);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index e99b61b..1f9cb91 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -67,7 +67,6 @@ public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFac
             private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(aggregs.length);
 
             private boolean first = true;
-            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -81,8 +80,7 @@ public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFac
                 for (int i = 0; i < aggregFactories.length; i++) {
                     aggregs[i].init();
                 }
-                isOpen = true;
-                writer.open();
+                super.open();
             }
 
             @Override
@@ -121,14 +119,6 @@ public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFac
                     aggregs[f].step(tupleRef);
                 }
             }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                failed = true;
-                if (isOpen) {
-                    writer.fail();
-                }
-            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index a7468a7..71b44d3 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.comm.IFrameTupleAppender;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -51,25 +52,20 @@ public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends Abstr
 
     @Override
     public void close() throws HyracksDataException {
-        HyracksDataException closeException = null;
+        if (!isOpen) {
+            return;
+        }
+        Throwable closeException = null;
         try {
             flushIfNotFailed();
         } catch (Exception e) {
-            closeException = HyracksDataException.create(e);
-            writer.fail();
+            closeException = e;
+            fail(closeException);
         } finally {
-            try {
-                writer.close();
-            } catch (Exception e) {
-                if (closeException == null) {
-                    closeException = HyracksDataException.create(e);
-                } else {
-                    closeException.addSuppressed(e);
-                }
-            }
+            closeException = CleanupUtils.close(writer, closeException);
         }
         if (closeException != null) {
-            throw closeException;
+            throw HyracksDataException.create(closeException);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
index 5cced8d..c7d2d94 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -27,6 +27,7 @@ public abstract class AbstractOneInputPushRuntime implements IPushRuntime {
     protected IFrameWriter writer;
     protected RecordDescriptor outputRecordDesc;
     protected boolean failed;
+    protected boolean isOpen;
 
     @Override
     public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
@@ -35,8 +36,24 @@ public abstract class AbstractOneInputPushRuntime implements IPushRuntime {
     }
 
     @Override
+    public void open() throws HyracksDataException {
+        isOpen = true;
+        writer.open();
+    }
+
+    @Override
     public void fail() throws HyracksDataException {
         failed = true;
-        writer.fail();
+        if (isOpen) {
+            writer.fail();
+        }
+    }
+
+    protected void fail(Throwable failure) {
+        try {
+            fail();
+        } catch (Throwable th) {
+            failure.addSuppressed(th);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
index 35563e0..cccfd62 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -32,16 +32,6 @@ public abstract class AbstractOneInputSourcePushRuntime extends AbstractOneInput
     }
 
     @Override
-    public void close() throws HyracksDataException {
-        // close is a no op since this operator completes operating in open()
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        // fail is a no op since if a failure happened, the operator would've already called fail() on downstream
-    }
-
-    @Override
     public void flush() throws HyracksDataException {
         // flush will never be called on this runtime
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 159fde7..3cee12d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -151,7 +151,8 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto
 
         @Override
         public void open() throws HyracksDataException {
-            writer.open();
+            // writer opened many times?
+            super.open();
             if (first) {
                 first = false;
                 initAccessAppendRef(ctx);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index f251bb7..2453029 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -27,6 +27,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
 import org.apache.hyracks.dataflow.std.buffermanager.FrameFreeSlotPolicyFactory;
 import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
@@ -69,7 +70,7 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
+                super.open();
                 if (frameSorter == null) {
                     IFrameBufferManager manager = new VariableFrameMemoryManager(
                             new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
@@ -87,11 +88,22 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime
 
             @Override
             public void close() throws HyracksDataException {
-                try {
-                    frameSorter.sort();
-                    frameSorter.flush(writer);
-                } finally {
-                    writer.close();
+                Throwable failure = null;
+                if (isOpen) {
+                    try {
+                        if (!failed) {
+                            frameSorter.sort();
+                            frameSorter.flush(writer);
+                        }
+                    } catch (Throwable th) {
+                        failure = th;
+                        fail(th);
+                    } finally {
+                        failure = CleanupUtils.close(writer, failure);
+                    }
+                }
+                if (failure != null) {
+                    throw HyracksDataException.create(failure);
                 }
             }
         };

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index b1b652f..5b36c5f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -96,7 +96,6 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
             private IScalarEvaluator[] eval = new IScalarEvaluator[evalFactories.length];
             private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
             private boolean first = true;
-            private boolean isOpen = false;
             private int tupleIndex = 0;
 
             @Override
@@ -109,15 +108,7 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
                         eval[i] = evalFactories[i].createScalarEvaluator(ctx);
                     }
                 }
-                isOpen = true;
-                writer.open();
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                if (isOpen) {
-                    super.close();
-                }
+                super.open();
             }
 
             @Override
@@ -177,13 +168,6 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
             }
 
             @Override
-            public void fail() throws HyracksDataException {
-                if (isOpen) {
-                    super.fail();
-                }
-            }
-
-            @Override
             public void flush() throws HyracksDataException {
                 appender.flush(writer);
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 7bd924d..9ca3cd6 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -48,7 +48,7 @@ public class EmptyTupleSourceRuntimeFactory extends AbstractPushRuntimeFactory {
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
+                super.open();
                 if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                     throw new IllegalStateException();
                 }
@@ -56,13 +56,10 @@ public class EmptyTupleSourceRuntimeFactory extends AbstractPushRuntimeFactory {
             }
 
             @Override
-            public void fail() throws HyracksDataException {
-                writer.fail();
-            }
-
-            @Override
             public void close() throws HyracksDataException {
-                writer.close();
+                if (isOpen) {
+                    writer.close();
+                }
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index f94672d..832cb22 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -49,11 +49,6 @@ public class NestedTupleSourceRuntimeFactory extends AbstractPushRuntimeFactory
             initAccessAppend(ctx);
         }
 
-        @Override
-        public void open() throws HyracksDataException {
-            writer.open();
-        }
-
         public void writeTuple(ByteBuffer inputBuffer, int tIndex) throws HyracksDataException {
             tAccess.reset(inputBuffer);
             appendTupleToFrame(tIndex);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 33b7725..ca58d4d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -89,7 +89,6 @@ public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRun
             private final IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
             private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
             private boolean first = true;
-            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -104,22 +103,7 @@ public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRun
                 for (int i = 0; i < runningAggregates.length; i++) {
                     raggs[i].init();
                 }
-                isOpen = true;
-                writer.open();
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                if (isOpen) {
-                    super.close();
-                }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                if (isOpen) {
-                    writer.fail();
-                }
+                super.open();
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index 59df402..aca5bf1 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -72,7 +72,7 @@ public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeF
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
+                super.open();
                 if (evalMaxObjects == null) {
                     initAccessAppendRef(ctx);
                     evalMaxObjects = maxObjectsEvalFactory.createScalarEvaluator(ctx);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index a8ca082..713a99c 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -53,7 +53,7 @@ public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntim
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
+                super.open();
                 if (first) {
                     first = false;
                     initAccessAppend(ctx);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 171544d..933e640 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -92,8 +92,7 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
                     initAccessAppendFieldRef(ctx);
                     eval = cond.createScalarEvaluator(ctx);
                 }
-                writer.open();
-
+                super.open();
                 //prepare nullTupleBuilder
                 if (retainMissing && missingWriter == null) {
                     missingWriter = missingWriterFactory.createMissingWriter();
@@ -105,15 +104,6 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
             }
 
             @Override
-            public void close() throws HyracksDataException {
-                try {
-                    flushIfNotFailed();
-                } finally {
-                    writer.close();
-                }
-            }
-
-            @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 tAccess.reset(buffer);
                 int nTuple = tAccess.getTupleCount();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
index 53974b2..7e5c346 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -43,6 +43,9 @@ public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRunt
     private char fieldDelimiter;
     private ITupleParserFactory parserFactory;
 
+    /*
+     * NOTE: This operator doesn't follow the IFrameWriter protocol
+     */
     public StringStreamingRuntimeFactory(String command, IPrinterFactory[] printerFactories, char fieldDelimiter,
             ITupleParserFactory parserFactory) {
         super(null);
@@ -129,7 +132,6 @@ public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRunt
                     first = false;
                     initAccessAppendRef(ctx);
                 }
-
                 try {
                     ITupleParser parser = parserFactory.createTupleParser(ctx);
                     process = Runtime.getRuntime().exec(command);
@@ -141,6 +143,7 @@ public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRunt
                             new DumpInStreamToPrintStream(process.getErrorStream(), System.err);
                     dumpStderr = new Thread(disps);
                     dumpStderr.start();
+                    super.open();
                 } catch (IOException e) {
                     throw HyracksDataException.create(e);
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2a7381a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 914f4a0..22189ac 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -94,7 +94,7 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
+                super.open();
                 if (tRef == null) {
                     initAccessAppendRef(ctx);
                 }