You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/05/17 21:46:33 UTC

[bahir-flink] branch master updated: [BAHIR-177] Fixes state recovery/size of the recovered queue

This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dd02c6d  [BAHIR-177] Fixes state recovery/size of the recovered queue
dd02c6d is described below

commit dd02c6dfda2a1bd42a6f79ec4c7ba273216640a0
Author: Dominik Wosin <bl...@gmail.com>
AuthorDate: Mon Apr 1 23:41:18 2019 +0200

    [BAHIR-177] Fixes state recovery/size of the recovered queue
    
    Two issues are meant to be fixed in this PR:
    
    - As described in BAHIR-177 currently the state recovery of
    Bahir operators depends on randomly generated IDs, which
    basically makes it impossible to recover state properly.
    The chagne has been done, so that the outStreamId is
    used instead of random names.
    
    -The size of the queue recovered in restoreQueuerState()
    was equal to the actual size (number of elements) of the
    snapshot queue. If the queue was empty, the method would
    try to create queue with the size 0, which is currently
    forbidden for the PriorityQueue in Java.
    
    Closes #51
---
 .../flink/streaming/siddhi/SiddhiStream.java       |  8 ++++----
 .../siddhi/operator/AbstractSiddhiOperator.java    | 24 +++++++++++++++++-----
 .../siddhi/operator/SiddhiStreamOperator.java      |  9 ++++----
 .../siddhi/utils/SiddhiStreamFactory.java          |  4 ++--
 4 files changed, 30 insertions(+), 15 deletions(-)

diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
index 43d7436..ca61a0a 100644
--- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
@@ -236,7 +236,7 @@ public abstract class SiddhiStream {
             TypeInformation<T> typeInformation =
                 SiddhiTypeFactory.getTupleTypeInformation(siddhiContext.getFinalExecutionPlan(), outStreamId);
             siddhiContext.setOutputStreamType(typeInformation);
-            return returnsInternal(siddhiContext);
+            return returnsInternal(siddhiContext, outStreamId);
         }
 
         /**
@@ -269,11 +269,11 @@ public abstract class SiddhiStream {
             siddhiContext.setOutputStreamType(typeInformation);
             siddhiContext.setExtensions(environment.getExtensions());
             siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig());
-            return returnsInternal(siddhiContext);
+            return returnsInternal(siddhiContext, outStreamId);
         }
 
-        private <T> DataStream<T> returnsInternal(SiddhiOperatorContext siddhiContext) {
-            return SiddhiStreamFactory.createDataStream(siddhiContext, this.dataStream);
+        private <T> DataStream<T> returnsInternal(SiddhiOperatorContext siddhiContext, String outStreamId) {
+            return SiddhiStreamFactory.createDataStream(siddhiContext, this.dataStream, outStreamId);
         }
     }
 }
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
index 8cb6d67..79df6ac 100755
--- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
@@ -20,8 +20,10 @@ package org.apache.flink.streaming.siddhi.operator;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 
@@ -51,7 +53,11 @@ import org.slf4j.LoggerFactory;
 import org.wso2.siddhi.core.SiddhiAppRuntime;
 import org.wso2.siddhi.core.SiddhiManager;
 import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.query.api.SiddhiApp;
+import org.wso2.siddhi.query.api.annotation.Annotation;
+import org.wso2.siddhi.query.api.annotation.Element;
 import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.compiler.SiddhiCompiler;
 
 /**
  * <h1>Siddhi Runtime Operator</h1>
@@ -86,10 +92,10 @@ import org.wso2.siddhi.query.api.definition.AbstractDefinition;
 public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT>
     implements OneInputStreamOperator<IN, OUT> {
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
-    private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+    protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
     private static final String SIDDHI_RUNTIME_STATE_NAME = "siddhiRuntimeState";
     private static final String QUEUED_RECORDS_STATE_NAME = "queuedRecordsState";
-
+    protected final String operatorName;
     private final SiddhiOperatorContext siddhiPlan;
     private final String executionExpression;
     private final boolean isProcessingTime;
@@ -108,13 +114,13 @@ public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOper
     /**
      * @param siddhiPlan Siddhi CEP  Execution Plan
      */
-    public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
+    public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan, String operatorName) {
         validate(siddhiPlan);
         this.executionExpression = siddhiPlan.getFinalExecutionPlan();
         this.siddhiPlan = siddhiPlan;
         this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
         this.streamRecordSerializers = new HashMap<>();
-
+        this.operatorName = operatorName;
         registerStreamRecordSerializers();
     }
 
@@ -228,7 +234,15 @@ public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOper
             for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
                 this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
             }
-            this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(executionExpression);
+
+            SiddhiApp siddhiApp = SiddhiCompiler.parse(executionExpression);
+            Annotation nameAnnotation = new Annotation("Name");
+            Element element = new Element(null, operatorName);
+            List<Element> elements = new ArrayList<>();
+            elements.add(element);
+            nameAnnotation.setElements(elements);
+            siddhiApp.getAnnotations().add(nameAnnotation);
+            this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
             this.siddhiRuntime.start();
             registerInputAndOutput(this.siddhiRuntime);
             LOGGER.info("Siddhi {} started", siddhiRuntime.getName());
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
index 5c54ad8..0ce719c 100755
--- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
@@ -36,8 +36,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  */
 public class SiddhiStreamOperator<IN, OUT> extends AbstractSiddhiOperator<Tuple2<String, IN>, OUT> {
 
-    public SiddhiStreamOperator(SiddhiOperatorContext siddhiPlan) {
-        super(siddhiPlan);
+    public SiddhiStreamOperator(SiddhiOperatorContext siddhiPlan, String operatorName) {
+        super(siddhiPlan, operatorName);
     }
 
     @Override
@@ -68,9 +68,10 @@ public class SiddhiStreamOperator<IN, OUT> extends AbstractSiddhiOperator<Tuple2
 
     @Override
     protected PriorityQueue<StreamRecord<Tuple2<String, IN>>> restoreQueuerState(DataInputView dataInputView) throws IOException {
-        int sizeOfQueue = dataInputView.readInt();
+        int snapshotSize = dataInputView.readInt();
+        int sizeOfQueue = snapshotSize > 0 ? snapshotSize : this.INITIAL_PRIORITY_QUEUE_CAPACITY;
         PriorityQueue<StreamRecord<Tuple2<String, IN>>> priorityQueue = new PriorityQueue<>(sizeOfQueue);
-        for (int i = 0; i < sizeOfQueue; i++) {
+        for (int i = 0; i < snapshotSize; i++) {
             String streamId = dataInputView.readUTF();
             StreamElement streamElement = getStreamRecordSerializer(streamId).deserialize(dataInputView);
             priorityQueue.offer(streamElement.<Tuple2<String, IN>>asRecord());
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
index 20ca535..d11f029 100644
--- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
  */
 public class SiddhiStreamFactory {
     @SuppressWarnings("unchecked")
-    public static <OUT> DataStream<OUT> createDataStream(SiddhiOperatorContext context, DataStream<Tuple2<String, Object>> namedStream) {
-        return namedStream.transform(context.getName(), context.getOutputStreamType(), new SiddhiStreamOperator(context));
+    public static <OUT> DataStream<OUT> createDataStream(SiddhiOperatorContext context, DataStream<Tuple2<String, Object>> namedStream, String outStreamId) {
+        return namedStream.transform(context.getName(), context.getOutputStreamType(), new SiddhiStreamOperator(context,outStreamId));
     }
 }