You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2020/08/01 03:28:03 UTC
[asterixdb] branch master updated: [NO ISSUE][RT] Add Support For
Feed Ingestion Without Message
This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5721ff2 [NO ISSUE][RT] Add Support For Feed Ingestion Without Message
5721ff2 is described below
commit 5721ff2b8629dc0972bdf6796531b0ec47c8b732
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Fri Jul 31 13:05:35 2020 -0700
[NO ISSUE][RT] Add Support For Feed Ingestion Without Message
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add an option to specify whether FeedMetaStoreNodePushable
should process a message as part of the feed data or not.
Change-Id: I804fb4ae884020906dc09be188fa976867662d40
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7404
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <al...@gmail.com>
Reviewed-by: Michael Blow <mb...@apache.org>
---
.../org/apache/asterix/utils/FeedOperations.java | 4 ++--
.../operators/FeedMetaOperatorDescriptor.java | 10 +++++---
.../operators/FeedMetaStoreNodePushable.java | 27 +++++++++++++++-------
3 files changed, 28 insertions(+), 13 deletions(-)
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 8ea0ad4..20a97ed 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -309,7 +309,7 @@ public class FeedOperations {
if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
&& ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) {
metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
- feedPolicyEntity.getProperties(), FeedRuntimeType.STORE);
+ feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, true);
opId = metaOp.getOperatorId();
opDesc.setOperatorId(opId);
} else {
@@ -323,7 +323,7 @@ public class FeedOperations {
// anything on the network interface needs to be message compatible
if (connectorDesc instanceof MToNPartitioningConnectorDescriptor) {
metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
- feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE);
+ feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE, true);
opId = metaOp.getOperatorId();
opDesc.setOperatorId(opId);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index 74858ce..3f02927 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -67,14 +67,17 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe
private final Map<String, String> feedPolicyProperties;
/**
- * type for the feed runtime associated with the operator.
+ * Type for the feed runtime associated with the operator.
* Possible values: COMPUTE, STORE, OTHER
**/
private final FeedRuntimeType runtimeType;
+ /** Whether the incoming frame has a message that this operator should handle **/
+ private final boolean hasMessage;
+
public FeedMetaOperatorDescriptor(final JobSpecification spec, final FeedConnectionId feedConnectionId,
final IOperatorDescriptor coreOperatorDescriptor, final Map<String, String> feedPolicyProperties,
- final FeedRuntimeType runtimeType) {
+ final FeedRuntimeType runtimeType, boolean hasMessage) {
super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
this.feedConnectionId = feedConnectionId;
this.feedPolicyProperties = feedPolicyProperties;
@@ -83,6 +86,7 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe
}
this.coreOperator = coreOperatorDescriptor;
this.runtimeType = runtimeType;
+ this.hasMessage = hasMessage;
}
@Override
@@ -97,7 +101,7 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe
break;
case STORE:
nodePushable = new FeedMetaStoreNodePushable(ctx, recordDescProvider, partition, nPartitions,
- coreOperator, feedConnectionId, feedPolicyProperties, this);
+ coreOperator, feedConnectionId, feedPolicyProperties, this, hasMessage);
break;
default:
throw new RuntimeDataException(ErrorCode.OPERATORS_FEED_META_OPERATOR_DESCRIPTOR_INVALID_RUNTIME,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 94ae75c..7548313 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -51,8 +51,11 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
private static final Logger LOGGER = LogManager.getLogger();
+ /** Whether the incoming frame has a message that this operator should handle **/
+ private final boolean hasMessage;
+
/** Runtime node pushable corresponding to the core feed operator **/
- private AbstractUnaryInputUnaryOutputOperatorNodePushable insertOperator;
+ private final AbstractUnaryInputUnaryOutputOperatorNodePushable insertOperator;
/**
* A policy accessor that ensures dyanmic decisions for a feed are taken
@@ -92,10 +95,11 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
private final long traceCategory;
- public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
- Map<String, String> feedPolicyProperties, FeedMetaOperatorDescriptor feedMetaOperatorDescriptor)
- throws HyracksDataException {
+ FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+ Map<String, String> feedPolicyProperties, FeedMetaOperatorDescriptor feedMetaOperatorDescriptor,
+ boolean hasMessage) throws HyracksDataException {
+ this.hasMessage = hasMessage;
this.ctx = ctx;
this.insertOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
.createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
@@ -104,8 +108,13 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
this.connectionId = feedConnectionId;
this.feedManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
.getApplicationContext()).getActiveManager();
- this.message = new VSizeFrame(ctx);
- TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
+ if (hasMessage) {
+ this.message = new VSizeFrame(ctx);
+ TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
+ } else {
+ this.message = null;
+ }
+
this.recordDescProvider = recordDescProvider;
this.opDesc = feedMetaOperatorDescriptor;
tracer = ctx.getJobletContext().getServiceContext().getTracer();
@@ -147,7 +156,9 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
long tid = tracer.durationB("Ingestion-Store", traceCategory, null);
try {
- FeedUtils.processFeedMessage(buffer, message, fta);
+ if (hasMessage) {
+ FeedUtils.processFeedMessage(buffer, message, fta);
+ }
writer.nextFrame(buffer);
} catch (Exception e) {
LOGGER.log(Level.WARN, "Failure Processing a frame at store side", e);