You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2018/06/20 02:15:52 UTC

asterixdb-bad git commit: Improve performance of NotifyBrokerRuntime code

Repository: asterixdb-bad
Updated Branches:
  refs/heads/master 139909bb3 -> 105ee6d16


Improve performance of NotifyBrokerRuntime code

Change-Id: Ia4ecd381d102c67f7c66cfa965312bfb885aa281


Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/105ee6d1
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/105ee6d1
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/105ee6d1

Branch: refs/heads/master
Commit: 105ee6d1676e2f75c1c1b3d3996ad364ff6b414a
Parents: 139909b
Author: Steven Glenn Jacobs <sj...@ucr.edu>
Authored: Tue Jun 19 17:22:24 2018 -0700
Committer: Steven Glenn Jacobs <sj...@ucr.edu>
Committed: Tue Jun 19 17:22:24 2018 -0700

----------------------------------------------------------------------
 asterix-bad/pom.xml                             |  10 ++
 .../bad/runtime/NotifyBrokerRuntime.java        | 122 ++++++++++---------
 2 files changed, 77 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/105ee6d1/asterix-bad/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
index b7d775a..cc8204e 100644
--- a/asterix-bad/pom.xml
+++ b/asterix-bad/pom.xml
@@ -276,6 +276,16 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${hyracks.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>algebricks-data</artifactId>
+      <version>${hyracks.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-control-common</artifactId>
       <version>${hyracks.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/105ee6d1/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index 6ffb244..8e07af2 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -19,34 +19,33 @@
 
 package org.apache.asterix.bad.runtime;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AOrderedlistPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ARecordPrinterFactory;
 import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
 import org.apache.asterix.om.base.ADateTime;
-import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.ARecord;
-import org.apache.asterix.om.base.AUUID;
 import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.data.IPrinter;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -58,15 +57,19 @@ import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
 
 public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
     private static final Logger LOGGER = Logger.getLogger(NotifyBrokerRuntime.class.getName());
 
     private final ByteBufferInputStream bbis = new ByteBufferInputStream();
     private final DataInputStream di = new DataInputStream(bbis);
-    private final AOrderedListSerializerDeserializer subSerDes =
-            new AOrderedListSerializerDeserializer(new AOrderedListType(BuiltinType.AUUID, null));
-    private final ARecordSerializerDeserializer recordSerDes;
+    private static final AStringSerializerDeserializer stringSerDes =
+            new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
+
+    private final IPrinter recordPrinterFactory;
+    private final IPrinter subscriptionIdListPrinterFactory;
 
     private IPointable inputArg0 = new VoidPointable();
     private IPointable inputArg1 = new VoidPointable();
@@ -74,14 +77,14 @@ public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRu
     private IScalarEvaluator eval0;
     private IScalarEvaluator eval1;
     private IScalarEvaluator eval2;
-    private final ActiveManager activeManager;
     private final EntityId entityId;
     private final boolean push;
-    private AOrderedList pushList;
-    private ARecord pushRecord;
-    private final IAType recordType;
-    private final Map<String, HashSet<String>> sendData = new HashMap<>();
+    private final Map<String, String> sendData = new HashMap<>();
+    private final Map<String, ByteArrayOutputStream> sendbaos = new HashMap<>();
+    private final Map<String, PrintStream> sendStreams = new HashMap<>();
     private String executionTimeString;
+    private boolean firstResult = true;
+    String endpoint;
 
     public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
             IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
@@ -90,14 +93,11 @@ public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRu
         eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
         eval1 = pushListEvalFactory.createScalarEvaluator(ctx);
         eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
-        this.activeManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
-                .getApplicationContext()).getActiveManager();
         this.entityId = activeJobId;
         this.push = push;
-        this.pushList = null;
-        this.pushRecord = null;
-        this.recordType = recordType;
-        recordSerDes = new ARecordSerializerDeserializer((ARecordType) recordType);
+        recordPrinterFactory = new ARecordPrinterFactory((ARecordType) recordType).createPrinter();
+        subscriptionIdListPrinterFactory =
+                new AOrderedlistPrinterFactory(new AOrderedListType(BuiltinType.AUUID, null)).createPrinter();
         executionTimeString = null;
     }
 
@@ -106,28 +106,18 @@ public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRu
         return;
     }
 
-    private void addSubscriptions(String endpoint, AOrderedList subscriptionIds) {
-        for (int i = 0; i < subscriptionIds.size(); i++) {
-            AUUID subId = (AUUID) subscriptionIds.getItem(i);
-            String subscriptionString = subId.toString();
-            //Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value
-            subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2);
-            subscriptionString = "\"" + subscriptionString + "\"";
-            sendData.get(endpoint).add(subscriptionString);
-        }
-    }
-
     public String createData(String endpoint) {
-        String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
-                + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
-                + executionTimeString + "\", \"subscriptionIds\":[";
-        for (String value : sendData.get(endpoint)) {
-            JSON += value;
-            JSON += ",";
+        String resultTitle = "\"subscriptionIds";
+        if (push) {
+            resultTitle = "\"results\"";
         }
-        JSON = JSON.substring(0, JSON.length() - 1);
-        JSON += "]}";
-        return JSON;
+        String jsonStr = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
+                + entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+                + executionTimeString + "\", " + resultTitle + ":[";
+        jsonStr += sendData.get(endpoint);
+        jsonStr = jsonStr.substring(0, jsonStr.length());
+        jsonStr += "]}";
+        return jsonStr;
 
     }
 
@@ -172,6 +162,11 @@ public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRu
             eval1.evaluate(tRef, inputArg1);
             eval2.evaluate(tRef, inputArg2);
 
+            /*The incoming tuples have three fields:
+             1. eval0 will get the serialized broker endpoint string
+             2. eval1 will get the payload (either the subscriptionIds or entire results)
+             3. eval2 will get the channel execution time stamp (the same for all tuples)
+            */
             if (executionTimeString == null) {
                 int resultSetOffset = inputArg2.getStartOffset();
                 bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
@@ -185,34 +180,51 @@ public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRu
 
             int serBrokerOffset = inputArg0.getStartOffset();
             bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
-            String endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue();
-            sendData.putIfAbsent(endpoint, new HashSet<>());
+            endpoint = stringSerDes.deserialize(di).getStringValue();
+            sendbaos.putIfAbsent(endpoint, new ByteArrayOutputStream());
+            try {
+                sendStreams.putIfAbsent(endpoint,
+                        new PrintStream(sendbaos.get(endpoint), true, StandardCharsets.UTF_8.name()));
+            } catch (UnsupportedEncodingException e) {
+                throw new HyracksDataException(e.getMessage());
+            }
 
             if (push) {
                 int pushOffset = inputArg1.getStartOffset();
                 bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1);
-                //TODO: Right now this creates an object per channel result. Need to find a better way to deserialize
-                pushRecord = recordSerDes.deserialize(di);
-                sendData.get(endpoint).add(pushRecord.toString());
+                if (!firstResult) {
+                    sendStreams.get(endpoint).append(',');
+                }
+                recordPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(), inputArg1.getLength(),
+                        sendStreams.get(endpoint));
 
             } else {
-                int serSubOffset = inputArg1.getStartOffset();
-                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
-                pushList = subSerDes.deserialize(di);
-                addSubscriptions(endpoint, pushList);
+                if (!firstResult) {
+                    sendStreams.get(endpoint).append(',');
+                }
+                subscriptionIdListPrinterFactory.print(inputArg1.getByteArray(), inputArg1.getStartOffset(),
+                        inputArg1.getLength(),
+                        sendStreams.get(endpoint));
             }
+            firstResult = false;
         }
 
     }
 
     @Override
     public void close() throws HyracksDataException {
-        for (String endpoint : sendData.keySet()) {
-            if (sendData.get(endpoint).size() > 0) {
-                sendGroupOfResults(endpoint);
-                sendData.get(endpoint).clear();
+        for (String endpoint : sendStreams.keySet()) {
+            sendData.put(endpoint, new String(sendbaos.get(endpoint).toByteArray(), StandardCharsets.UTF_8));
+            sendGroupOfResults(endpoint);
+            sendStreams.get(endpoint).close();
+            try {
+                sendbaos.get(endpoint).close();
+            } catch (IOException e) {
+                throw new HyracksDataException(e.getMessage());
             }
+
         }
+
         return;
     }