You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2020/08/01 03:28:03 UTC

[asterixdb] branch master updated: [NO ISSUE][RT] Add Support For Feed Ingestion Without Message

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5721ff2  [NO ISSUE][RT] Add Support For Feed Ingestion Without Message
5721ff2 is described below

commit 5721ff2b8629dc0972bdf6796531b0ec47c8b732
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Fri Jul 31 13:05:35 2020 -0700

    [NO ISSUE][RT] Add Support For Feed Ingestion Without Message
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Add an option to specify whether FeedMetaStoreNodePushable
      should process a message as part of the feed data or not.
    
    Change-Id: I804fb4ae884020906dc09be188fa976867662d40
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7404
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
    Reviewed-by: Michael Blow <mb...@apache.org>
---
 .../org/apache/asterix/utils/FeedOperations.java   |  4 ++--
 .../operators/FeedMetaOperatorDescriptor.java      | 10 +++++---
 .../operators/FeedMetaStoreNodePushable.java       | 27 +++++++++++++++-------
 3 files changed, 28 insertions(+), 13 deletions(-)

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 8ea0ad4..20a97ed 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -309,7 +309,7 @@ public class FeedOperations {
                 if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
                         && ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) {
                     metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
-                            feedPolicyEntity.getProperties(), FeedRuntimeType.STORE);
+                            feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, true);
                     opId = metaOp.getOperatorId();
                     opDesc.setOperatorId(opId);
                 } else {
@@ -323,7 +323,7 @@ public class FeedOperations {
                             // anything on the network interface needs to be message compatible
                             if (connectorDesc instanceof MToNPartitioningConnectorDescriptor) {
                                 metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
-                                        feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE);
+                                        feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE, true);
                                 opId = metaOp.getOperatorId();
                                 opDesc.setOperatorId(opId);
                             }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index 74858ce..3f02927 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -67,14 +67,17 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe
     private final Map<String, String> feedPolicyProperties;
 
     /**
-     * type for the feed runtime associated with the operator.
+     * Type for the feed runtime associated with the operator.
      * Possible values: COMPUTE, STORE, OTHER
      **/
     private final FeedRuntimeType runtimeType;
 
+    /** Whether the incoming frame has a message that this operator should handle **/
+    private final boolean hasMessage;
+
     public FeedMetaOperatorDescriptor(final JobSpecification spec, final FeedConnectionId feedConnectionId,
             final IOperatorDescriptor coreOperatorDescriptor, final Map<String, String> feedPolicyProperties,
-            final FeedRuntimeType runtimeType) {
+            final FeedRuntimeType runtimeType, boolean hasMessage) {
         super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
         this.feedConnectionId = feedConnectionId;
         this.feedPolicyProperties = feedPolicyProperties;
@@ -83,6 +86,7 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe
         }
         this.coreOperator = coreOperatorDescriptor;
         this.runtimeType = runtimeType;
+        this.hasMessage = hasMessage;
     }
 
     @Override
@@ -97,7 +101,7 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe
                 break;
             case STORE:
                 nodePushable = new FeedMetaStoreNodePushable(ctx, recordDescProvider, partition, nPartitions,
-                        coreOperator, feedConnectionId, feedPolicyProperties, this);
+                        coreOperator, feedConnectionId, feedPolicyProperties, this, hasMessage);
                 break;
             default:
                 throw new RuntimeDataException(ErrorCode.OPERATORS_FEED_META_OPERATOR_DESCRIPTOR_INVALID_RUNTIME,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 94ae75c..7548313 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -51,8 +51,11 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
 
     private static final Logger LOGGER = LogManager.getLogger();
 
+    /** Whether the incoming frame has a message that this operator should handle **/
+    private final boolean hasMessage;
+
     /** Runtime node pushable corresponding to the core feed operator **/
-    private AbstractUnaryInputUnaryOutputOperatorNodePushable insertOperator;
+    private final AbstractUnaryInputUnaryOutputOperatorNodePushable insertOperator;
 
     /**
      * A policy accessor that ensures dyanmic decisions for a feed are taken
@@ -92,10 +95,11 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
 
     private final long traceCategory;
 
-    public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
-            Map<String, String> feedPolicyProperties, FeedMetaOperatorDescriptor feedMetaOperatorDescriptor)
-            throws HyracksDataException {
+    FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
+            int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+            Map<String, String> feedPolicyProperties, FeedMetaOperatorDescriptor feedMetaOperatorDescriptor,
+            boolean hasMessage) throws HyracksDataException {
+        this.hasMessage = hasMessage;
         this.ctx = ctx;
         this.insertOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
                 .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
@@ -104,8 +108,13 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
         this.connectionId = feedConnectionId;
         this.feedManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
                 .getApplicationContext()).getActiveManager();
-        this.message = new VSizeFrame(ctx);
-        TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
+        if (hasMessage) {
+            this.message = new VSizeFrame(ctx);
+            TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
+        } else {
+            this.message = null;
+        }
+
         this.recordDescProvider = recordDescProvider;
         this.opDesc = feedMetaOperatorDescriptor;
         tracer = ctx.getJobletContext().getServiceContext().getTracer();
@@ -147,7 +156,9 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         long tid = tracer.durationB("Ingestion-Store", traceCategory, null);
         try {
-            FeedUtils.processFeedMessage(buffer, message, fta);
+            if (hasMessage) {
+                FeedUtils.processFeedMessage(buffer, message, fta);
+            }
             writer.nextFrame(buffer);
         } catch (Exception e) {
             LOGGER.log(Level.WARN, "Failure Processing a frame at store side", e);