You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by il...@apache.org on 2015/10/09 09:37:27 UTC

incubator-asterixdb-hyracks git commit: ASTERIXDB-1002: Fix exception handling in EmptyTupleSourceRuntimeFactory

Repository: incubator-asterixdb-hyracks
Updated Branches:
  refs/heads/master c4118bd0c -> 87428ba48


ASTERIXDB-1002: Fix exception handling in EmptyTupleSourceRuntimeFactory

Revisiting the previous fix by calling fail() on pipeline

Change-Id: I19f8c8485e483e4d4efeff939e6bd82c7a04a101
Reviewed-on: https://asterix-gerrit.ics.uci.edu/443
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hu...@gmail.com>
Reviewed-by: Ian Maxon <im...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/87428ba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/87428ba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/87428ba4

Branch: refs/heads/master
Commit: 87428ba48ec358ef6eb5d5e6875bf13b886b8f66
Parents: c4118bd
Author: Ildar Absalyamov <il...@gmail.com>
Authored: Fri Oct 9 00:06:31 2015 -0700
Committer: Ildar Absalyamov <il...@gmail.com>
Committed: Fri Oct 9 00:28:04 2015 -0700

----------------------------------------------------------------------
 .../meta/AlgebricksMetaOperatorDescriptor.java  | 27 ++++++++++++--------
 .../std/EmptyTupleSourceRuntimeFactory.java     | 19 +++++++-------
 .../nc/partitions/PipelinedPartition.java       |  4 ++-
 3 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/87428ba4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 6618326..1a7150e 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -20,9 +20,6 @@ package org.apache.hyracks.algebricks.runtime.operators.meta;
 
 import java.nio.ByteBuffer;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
@@ -36,6 +33,8 @@ import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
@@ -93,10 +92,11 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper
             final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
 
+            @Override
             public void initialize() throws HyracksDataException {
                 IFrameWriter startOfPipeline;
-                RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
-                        : null;
+                RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0
+                        ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0] : null;
 
                 PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
                         pipelineOutputRecordDescriptor);
@@ -105,8 +105,13 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper
                 } catch (AlgebricksException e) {
                     throw new HyracksDataException(e);
                 }
-                startOfPipeline.open();
-                startOfPipeline.close();
+                try {
+                    startOfPipeline.open();
+                } catch (HyracksDataException e) {
+                    startOfPipeline.fail();
+                } finally {
+                    startOfPipeline.close();
+                }
             }
         };
     }
@@ -120,10 +125,10 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper
             @Override
             public void open() throws HyracksDataException {
                 if (startOfPipeline == null) {
-                    RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
-                            : null;
-                    RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
-                            AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
+                    RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0
+                            ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0] : null;
+                    RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider
+                            .getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
                     PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
                             pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/87428ba4/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 5b66736..a2b9652 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -43,20 +43,21 @@ public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
     public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException {
         return new AbstractOneInputSourcePushRuntime() {
 
-            private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
-            private FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
+            private final ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
+            private final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
 
             @Override
             public void open() throws HyracksDataException {
                 writer.open();
-                try {
-                    if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                        throw new IllegalStateException();
-                    }
-                    appender.flush(writer, true);
-                } finally {
-                    writer.close();
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    throw new IllegalStateException();
                 }
+                appender.flush(writer, true);
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                writer.close();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/87428ba4/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
index 7f6dd10..2ca1e0f 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -81,7 +81,9 @@ public class PipelinedPartition implements IFrameWriter, IPartition {
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         ensureConnected();
-        delegate.nextFrame(buffer);
+        if (!failed) {
+            delegate.nextFrame(buffer);
+        }
     }
 
     private void ensureConnected() throws HyracksDataException {