You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2017/10/18 10:43:02 UTC

asterixdb git commit: [NO ISSUE][OTH] Appender flush call with tracing call normal flush

Repository: asterixdb
Updated Branches:
  refs/heads/master bc918f7ff -> 536c707dc


[NO ISSUE][OTH] Appender flush call with tracing call normal flush

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- The flush with tracing now calls the normal flush. This enables
  sub appenders overriding the flush call to maintain correctness.

Change-Id: I3f649798fa4cac049f66cc3621acdb28b1c94694
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2080
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/536c707d
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/536c707d
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/536c707d

Branch: refs/heads/master
Commit: 536c707dc2ffc92a9e2f331b8765697367d9ab3a
Parents: bc918f7
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Tue Oct 17 22:11:01 2017 -0700
Committer: Michael Blow <mb...@apache.org>
Committed: Wed Oct 18 03:42:36 2017 -0700

----------------------------------------------------------------------
 .../ConnectorDescriptorWithMessagingTest.java   | 27 +++++++++++---------
 .../common/comm/io/AbstractFrameAppender.java   |  8 +++---
 .../std/connectors/PartitionDataWriter.java     |  9 +++++++
 3 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/536c707d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index b1c7ff3..56a45c6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -49,6 +49,7 @@ import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.PartitionWithMessageDataWriter;
 import org.apache.hyracks.test.support.TestUtils;
+import org.apache.hyracks.util.trace.ITracer;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -81,8 +82,9 @@ public class ConnectorDescriptorWithMessagingTest {
                     BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
             RecordDescriptor rDesc = new RecordDescriptor(serdes);
             TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
-            IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
-                    CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+            PartitionWithMessageDataWriter partitioner =
+                    (PartitionWithMessageDataWriter) connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
+                            CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
             List<TestFrameWriter> recipients = new ArrayList<>();
             try {
                 partitioner.open();
@@ -90,7 +92,7 @@ public class ConnectorDescriptorWithMessagingTest {
                 for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
                     recipients.add((TestFrameWriter) writer);
                 }
-                partitioner.flush();
+                partitioner.flush(ITracer.NONE, null, null, null);
                 for (TestFrameWriter writer : recipients) {
                     Assert.assertEquals(writer.nextFrameCount(), 1);
                     fta.reset(writer.getLastFrame());
@@ -102,7 +104,7 @@ public class ConnectorDescriptorWithMessagingTest {
                 message.getBuffer().clear();
                 message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
                 message.getBuffer().flip();
-                partitioner.flush();
+                partitioner.flush(ITracer.NONE, null, null, null);;
                 for (TestFrameWriter writer : recipients) {
                     Assert.assertEquals(writer.nextFrameCount(), 2);
                     fta.reset(writer.getLastFrame());
@@ -115,7 +117,7 @@ public class ConnectorDescriptorWithMessagingTest {
                 message.getBuffer().clear();
                 message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
                 message.getBuffer().flip();
-                partitioner.flush();
+                partitioner.flush(ITracer.NONE, null, null, null);;
                 for (TestFrameWriter writer : recipients) {
                     Assert.assertEquals(writer.nextFrameCount(), 3);
                     fta.reset(writer.getLastFrame());
@@ -159,15 +161,16 @@ public class ConnectorDescriptorWithMessagingTest {
                     BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
             RecordDescriptor rDesc = new RecordDescriptor(serdes);
             TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
-            IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, CURRENT_PRODUCER,
-                    NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+            PartitionWithMessageDataWriter partitioner =
+                    (PartitionWithMessageDataWriter) connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
+                            CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
             partitioner.open();
             FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
             List<TestFrameWriter> recipients = new ArrayList<>();
             for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
                 recipients.add((TestFrameWriter) writer);
             }
-            partitioner.flush();
+            partitioner.flush(ITracer.NONE, null, null, null);;
             for (TestFrameWriter writer : recipients) {
                 Assert.assertEquals(writer.nextFrameCount(), 1);
                 fta.reset(writer.getLastFrame());
@@ -179,7 +182,7 @@ public class ConnectorDescriptorWithMessagingTest {
             message.getBuffer().clear();
             message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
             message.getBuffer().flip();
-            partitioner.flush();
+            partitioner.flush(ITracer.NONE, null, null, null);;
             for (TestFrameWriter writer : recipients) {
                 Assert.assertEquals(writer.nextFrameCount(), 2);
                 fta.reset(writer.getLastFrame());
@@ -191,7 +194,7 @@ public class ConnectorDescriptorWithMessagingTest {
             message.getBuffer().clear();
             message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
             message.getBuffer().flip();
-            partitioner.flush();
+            partitioner.flush(ITracer.NONE, null, null, null);;
             for (TestFrameWriter writer : recipients) {
                 Assert.assertEquals(writer.nextFrameCount(), 3);
                 fta.reset(writer.getLastFrame());
@@ -262,7 +265,7 @@ public class ConnectorDescriptorWithMessagingTest {
                 tuple = ttg.next();
             }
             partitioner.nextFrame(frame.getBuffer());
-            partitioner.flush();
+            partitioner.flush(ITracer.NONE, null, null, null);;
             Assert.assertEquals(1, partitionWriterFactory.getWriters().get(0).nextFrameCount());
             Assert.assertEquals(2, partitionWriterFactory.getWriters().get(1).nextFrameCount());
             Assert.assertEquals(1, partitionWriterFactory.getWriters().get(2).nextFrameCount());
@@ -321,7 +324,7 @@ public class ConnectorDescriptorWithMessagingTest {
                 appender.append(tuple);
             }
             partitioner.nextFrame(frame.getBuffer());
-            partitioner.flush();
+            partitioner.flush(ITracer.NONE, null, null, null);;
             Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(), 1);
             Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(), 1);
             Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(), 1);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/536c707d/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index 13632f0..a377d75 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -120,10 +120,10 @@ public class AbstractFrameAppender implements IFrameAppender {
     public void flush(IFrameWriter writer, ITracer tracer, String name, String cat, String args)
             throws HyracksDataException {
         final long tid = ITracer.check(tracer).durationB(name, cat, args);
-        if (tupleCount > 0) {
-            write(writer, true);
+        try {
+            flush(writer);
+        } finally {
+            ITracer.check(tracer).durationE(tid, args);
         }
-        writer.flush();
-        ITracer.check(tracer).durationE(tid, args);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/536c707d/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 189ce9d..4705001 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -31,6 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.util.trace.ITracer;
 
 public class PartitionDataWriter implements IFrameWriter {
     private final int consumerPartitionCount;
@@ -163,6 +164,14 @@ public class PartitionDataWriter implements IFrameWriter {
         }
     }
 
+    public void flush(ITracer tracer, String name, String cat, String args) throws HyracksDataException {
+        for (int i = 0; i < consumerPartitionCount; i++) {
+            if (allocatedFrames[i]) {
+                appenders[i].flush(pWriters[i], tracer, name, cat, args);
+            }
+        }
+    }
+
     // Wraps the current encountered exception into the final exception.
     private HyracksDataException wrapException(HyracksDataException finalException, Exception currentException) {
         if (finalException == null) {