You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/10/20 20:47:14 UTC
[incubator-streampipes] 03/03: [STREAMPIPES-250] Refactor Siddhi
Wrapper
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 8c5beb86f5c5e5cf36d39476b7e042674bf2d296
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Tue Oct 20 22:40:39 2020 +0200
[STREAMPIPES-250] Refactor Siddhi Wrapper
---
streampipes-wrapper-siddhi/pom.xml | 6 ++
.../wrapper/siddhi/constants/SiddhiConstants.java | 2 +
.../wrapper/siddhi/engine/SiddhiEngine.java | 23 ++++---
.../wrapper/siddhi/engine/SiddhiEventEngine.java | 55 ++--------------
.../siddhi/engine/SiddhiInvocationConfig.java | 74 ----------------------
.../siddhi/engine/SiddhiStatementGenerator.java | 7 +-
.../siddhi/engine/StreamPipesSiddhiProcessor.java | 5 +-
.../callback/SiddhiOutputStreamCallback.java | 17 +++--
.../engine/generator/EventTypeGenerator.java | 8 ++-
.../engine/generator/SiddhiAppGenerator.java | 22 +++----
.../generator/SiddhiInvocationConfigGenerator.java | 65 +++++++++++++++++++
.../wrapper/siddhi/model/EventType.java | 27 ++++----
.../SiddhiProcessorParams.java} | 72 ++++++++-------------
.../AbstractQueryGenerator.java} | 19 ++++--
.../FromClause.java} | 13 +---
.../SelectClause.java} | 16 +++--
.../EventType.java => query/SiddhiStatement.java} | 29 ++++-----
.../expression/Expression.java} | 17 +++--
.../expression/Expressions.java} | 18 +++---
.../expression/PropertyExpression.java} | 13 +---
.../wrapper/siddhi/utils/SiddhiUtils.java | 25 +++++---
21 files changed, 237 insertions(+), 296 deletions(-)
diff --git a/streampipes-wrapper-siddhi/pom.xml b/streampipes-wrapper-siddhi/pom.xml
index 1a8ce3a..fdbad74 100644
--- a/streampipes-wrapper-siddhi/pom.xml
+++ b/streampipes-wrapper-siddhi/pom.xml
@@ -52,6 +52,12 @@
<groupId>io.siddhi</groupId>
<artifactId>siddhi-query-compiler</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.siddhi.extension.execution.list</groupId>
+ <artifactId>siddhi-execution-list</artifactId>
+ <version>1.0.1</version>
+ </dependency>
+
</dependencies>
<repositories>
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java
index 3d2a2a3..146be5a 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/constants/SiddhiConstants.java
@@ -21,6 +21,8 @@ public class SiddhiConstants {
public static final String SELECT = "select";
public static final String INSERT = "insert";
+ public static final String WHITESPACE = " ";
+ public static final String ASTERISK = "*";
public static final String FIRST_STREAM_PREFIX = "s0";
public static final String SECOND_STREAM_PREFIX = "s1";
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
index 173f30f..dcc7eed 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
@@ -21,14 +21,17 @@ import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.definition.StreamDefinition;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiOutputStreamCallback;
import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiOutputStreamDebugCallback;
-import org.apache.streampipes.wrapper.siddhi.model.EventType;
+import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
import org.apache.streampipes.wrapper.siddhi.manager.SpSiddhiManager;
+import org.apache.streampipes.wrapper.siddhi.model.EventType;
import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,18 +63,18 @@ public class SiddhiEngine {
this.debugMode = true;
}
- public void initializeEngine(SiddhiInvocationConfig<? extends EventProcessorBindingParams> settings,
+ public void initializeEngine(SiddhiInvocationConfigGenerator<? extends EventProcessorBindingParams> settings,
SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext) {
- EventProcessorBindingParams params = settings.getParams();
- this.typeInfo = settings.getEventTypeInfo();
+ EventProcessorBindingParams params = settings.getSiddhiProcessorParams().getParams();
+ this.typeInfo = settings.getSiddhiProcessorParams().getEventTypeInfo();
SiddhiManager siddhiManager = SpSiddhiManager.INSTANCE.getSiddhiManager();
//this.timestampField = removeStreamIdFromTimestamp(setTimestamp(parameters));
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(settings.getSiddhiAppString());
- settings.getParams()
+ settings.getSiddhiProcessorParams().getParams()
.getInEventTypes()
.forEach((key, value) -> {
String preparedKey = SiddhiUtils.prepareName(key);
@@ -79,14 +82,16 @@ public class SiddhiEngine {
});
StreamCallback callback;
-
+ Map<String, StreamDefinition> streamDef = siddhiAppRuntime.getStreamDefinitionMap();
+ String outputKey = SiddhiUtils.getPreparedOutputTopicName(params);
+ List<Attribute> streamAttributes = streamDef.get(outputKey).getAttributeList();
if (!debugMode) {
- callback = new SiddhiOutputStreamCallback(spOutputCollector, settings.getOutputEventKeys(), runtimeContext);
+ callback = new SiddhiOutputStreamCallback(spOutputCollector, runtimeContext, streamAttributes);
} else {
callback = new SiddhiOutputStreamDebugCallback(debugCallback);
}
- siddhiAppRuntime.addCallback(SiddhiUtils.prepareName(SiddhiUtils.getOutputTopicName(params)), callback);
+ siddhiAppRuntime.addCallback(SiddhiUtils.getPreparedOutputTopicName(params), callback);
siddhiAppRuntime.start();
}
@@ -97,7 +102,7 @@ public class SiddhiEngine {
List<String> eventKeys = this.typeInfo
.get(sourceId)
.stream()
- .map(EventType::getEventTypeName)
+ .map(EventType::getFieldName)
.collect(Collectors.toList());
inputHandler.send(SiddhiUtils.toObjArr(eventKeys, event.getRaw()));
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
index 585ac5a..a6ce6e1 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
@@ -17,29 +17,22 @@
*/
package org.apache.streampipes.wrapper.siddhi.engine;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.runtime.EventProcessor;
-import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
-import org.apache.streampipes.wrapper.siddhi.engine.generator.InputStreamNameGenerator;
+import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-
public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> implements
EventProcessor<B>, SiddhiStatementGenerator<B> {
private static final Logger LOG = LoggerFactory.getLogger(SiddhiEventEngine.class);
private SiddhiEngine siddhiEngine;
- private SiddhiInvocationConfig<B> siddhiConfig;
- private List<String> outputEventKeys = new ArrayList<>();
public SiddhiEventEngine() {
this.siddhiEngine = new SiddhiEngine();
@@ -51,12 +44,9 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
@Override
public void onInvocation(B parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
- List<String> inputStreamNames = new InputStreamNameGenerator<>(parameters).generateInputStreamNames();
- this.outputEventKeys = new ArrayList<>(parameters.getOutEventType().keySet());
- String fromStatement = fromStatement(inputStreamNames, parameters);
- String selectStatement = selectStatement(outputEventKeys, parameters);
- this.siddhiConfig = new SiddhiInvocationConfig<>(parameters, fromStatement, selectStatement, inputStreamNames);
- this.siddhiEngine.initializeEngine(this.siddhiConfig, spOutputCollector, runtimeContext);
+ SiddhiInvocationConfigGenerator<B> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
+ this::fromStatement, this::selectStatement);
+ this.siddhiEngine.initializeEngine(siddhiConfigGenerator, spOutputCollector, runtimeContext);
}
@Override
@@ -69,45 +59,8 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
this.siddhiEngine.shutdownEngine();
}
- public SiddhiInvocationConfig<B> getSiddhiConfig() {
- return this.siddhiConfig;
- }
-
public String prepareName(String name) {
return SiddhiUtils.prepareName(name);
}
- public String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
- StringBuilder selectString = new StringBuilder();
- selectString.append(SiddhiConstants.SELECT).append(" ");
-
- if (outputEventKeys.size() > 0) {
- for (int i=0; i<outputEventKeys.size() - 1; i++) {
- selectString
- .append(SiddhiConstants.FIRST_STREAM_PREFIX)
- .append(outputEventKeys.get(i))
- .append(",");
- }
- selectString
- .append(SiddhiConstants.FIRST_STREAM_PREFIX)
- .append(outputEventKeys.get(outputEventKeys.size() - 1));
- }
- return selectString.toString();
- }
-
- public String getCustomOutputSelectStatement(DataProcessorInvocation invocation,
- String eventName) {
- StringBuilder selectString = new StringBuilder();
- selectString.append(SiddhiConstants.SELECT).append(" ");
-
- if (outputEventKeys.size() > 0) {
- for (int i = 0; i < outputEventKeys.size() - 1; i++) {
- selectString.append(eventName).append(".s0").append(outputEventKeys.get(i)).append(",");
- }
- selectString.append(eventName).append(".s0").append(outputEventKeys.get(outputEventKeys.size() - 1));
-
- }
-
- return selectString.toString();
- }
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiInvocationConfig.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiInvocationConfig.java
deleted file mode 100644
index 62eb29c..0000000
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiInvocationConfig.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.wrapper.siddhi.engine;
-
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-import org.apache.streampipes.wrapper.siddhi.engine.generator.EventTypeGenerator;
-import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiAppGenerator;
-import org.apache.streampipes.wrapper.siddhi.model.EventType;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class SiddhiInvocationConfig<B extends EventProcessorBindingParams> {
-
- private final B params;
- private final String siddhiAppString;
- private final List<String> inputStreamNames;
- private final Map<String, List<EventType>> eventTypeInfo;
- private final List<String> outputEventKeys;
-
- public SiddhiInvocationConfig(B params,
- String fromStatement,
- String selectStatement,
- List<String> inputStreamNames) {
- this.params = params;
- this.inputStreamNames = inputStreamNames;
- this.eventTypeInfo = new EventTypeGenerator<>(params).generateEventTypes();
- this.siddhiAppString = new SiddhiAppGenerator<>(params, inputStreamNames, eventTypeInfo, fromStatement, selectStatement)
- .generateSiddhiApp();
- this.outputEventKeys = new ArrayList<>(this.params.getOutEventType().keySet());
- }
-
- public B getParams() {
- return params;
- }
-
- public String getSiddhiAppString() {
- return siddhiAppString;
- }
-
- public List<String> getInputStreamNames() {
- return inputStreamNames;
- }
-
- public Map<String, List<EventType>> getEventTypeInfo() {
- return eventTypeInfo;
- }
-
- public List<String> getOutputEventKeys() {
- return this.outputEventKeys;
- }
-
- public String getSiddhiApp() {
- return this.siddhiAppString;
- }
-
-
-}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
index d7adcb9..9abc569 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
@@ -18,13 +18,12 @@
package org.apache.streampipes.wrapper.siddhi.engine;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
- String fromStatement(List<String> inputStreamNames, final B bindingParameters);
+ String fromStatement(SiddhiProcessorParams<B> siddhiParams);
- String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+ String selectStatement(SiddhiProcessorParams<B> siddhiParams);
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
index d603e9a..2aabb4c 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
+import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
import org.apache.streampipes.wrapper.standalone.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
@@ -39,7 +40,9 @@ public abstract class StreamPipesSiddhiProcessor extends StreamPipesDataProcesso
@Override
public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
-
+ SiddhiInvocationConfigGenerator<ProcessorParams> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
+ this::fromStatement, this::selectStatement);
+ this.siddhiEngine.initializeEngine(siddhiConfigGenerator, spOutputCollector, runtimeContext);
}
@Override
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java
index 947e99a..95dcf1e 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/callback/SiddhiOutputStreamCallback.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.wrapper.siddhi.engine.callback;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.query.api.definition.Attribute;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
@@ -28,24 +29,26 @@ import java.util.List;
public class SiddhiOutputStreamCallback extends StreamCallback {
private SpOutputCollector collector;
- private List<String> outputEventKeys;
private EventProcessorRuntimeContext runtimeContext;
+ private List<Attribute> streamAttributes;
+
public SiddhiOutputStreamCallback(SpOutputCollector collector,
- List<String> outputEventKeys,
- EventProcessorRuntimeContext runtimeContext) {
+ EventProcessorRuntimeContext runtimeContext,
+ List<Attribute> streamAttributes) {
this.collector = collector;
- this.outputEventKeys = outputEventKeys;
this.runtimeContext = runtimeContext;
+ this.streamAttributes = streamAttributes;
}
@Override
public void receive(Event[] events) {
if (events.length > 0) {
Event lastEvent = events[events.length - 1];
- collector.collect(SiddhiUtils.toSpEvent(lastEvent, outputEventKeys,
- runtimeContext.getOutputSchemaInfo
- (), runtimeContext.getOutputSourceInfo()));
+ collector.collect(SiddhiUtils.toSpEvent(lastEvent,
+ runtimeContext.getOutputSchemaInfo(),
+ runtimeContext.getOutputSourceInfo(),
+ streamAttributes));
}
}
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java
index 1aa95a4..7c1d28c 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/EventTypeGenerator.java
@@ -40,7 +40,7 @@ public class EventTypeGenerator<B extends EventProcessorBindingParams> {
List<EventType> sortedEventKeys = new ArrayList<>();
for (String propertyKey : value.keySet()) {
sortedEventKeys.add(makeEventType(currentStreamIndex.get(), propertyKey, value.get(propertyKey)));
- sortedEventKeys.sort(Comparator.comparing(EventType::getEventTypeName));
+ sortedEventKeys.sort(Comparator.comparing(EventType::getFieldName));
}
listOfEventKeys.put(key, sortedEventKeys);
currentStreamIndex.getAndSet(currentStreamIndex.get() + 1);
@@ -50,7 +50,7 @@ public class EventTypeGenerator<B extends EventProcessorBindingParams> {
}
private EventType makeEventType(Integer currentStreamIndex, String propertyName, Object propertyType) {
- return new EventType(currentStreamIndex, propertyName, toType((Class<?>) propertyType));
+ return new EventType(toSelectorPrefix(currentStreamIndex), propertyName, toType((Class<?>) propertyType));
}
private String toType(Class<?> o) {
@@ -70,4 +70,8 @@ public class EventTypeGenerator<B extends EventProcessorBindingParams> {
return SiddhiConstants.SIDDHI_OBJECT_TYPE;
}
}
+
+ private String toSelectorPrefix(Integer currentStreamIndex) {
+ return currentStreamIndex == 0 ? SiddhiConstants.FIRST_STREAM_PREFIX : SiddhiConstants.SECOND_STREAM_PREFIX;
+ }
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
index 0c54f16..b8fd175 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
@@ -18,45 +18,39 @@
package org.apache.streampipes.wrapper.siddhi.engine.generator;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
import org.apache.streampipes.wrapper.siddhi.model.EventType;
import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.Map;
import java.util.StringJoiner;
public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
private static final Logger LOG = LoggerFactory.getLogger(SiddhiAppGenerator.class);
- private B params;
- private List<String> inputStreamNames;
- private Map<String, List<EventType>> eventTypes;
+ private final SiddhiProcessorParams<B> siddhiParams;
private final String fromStatement;
private final String selectStatement;
private final StringBuilder siddhiAppString;
- public SiddhiAppGenerator(B params,
- List<String> inputStreamNames,
- Map<String, List<EventType>> eventTypes,
+ public SiddhiAppGenerator(SiddhiProcessorParams<B> siddhiParams,
String fromStatement,
String selectStatement) {
- this.params = params;
- this.inputStreamNames = inputStreamNames;
- this.eventTypes = eventTypes;
+ this.siddhiParams = siddhiParams;
this.fromStatement = fromStatement;
this.selectStatement = selectStatement;
this.siddhiAppString = new StringBuilder();
}
public String generateSiddhiApp() {
- LOG.info("Configuring event types for graph " + params.getGraph().getName());
+ LOG.info("Configuring event types for graph " + this.siddhiParams.getParams().getGraph().getName());
- this.eventTypes.forEach(this::registerEventType);
- registerStatements(fromStatement, selectStatement, SiddhiUtils.getOutputTopicName(params));
+ this.siddhiParams.getEventTypeInfo().forEach(this::registerEventType);
+ registerStatements(fromStatement, selectStatement, SiddhiUtils.getOutputTopicName(this.siddhiParams.getParams()));
return this.siddhiAppString.toString();
}
@@ -66,7 +60,7 @@ public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
StringJoiner joiner = new StringJoiner(",");
eventSchema.forEach(typeInfo -> {
- joiner.add("s" + typeInfo.getStreamIdentifier() + typeInfo.getEventTypeName() + " " + typeInfo.getEventType());
+ joiner.add(typeInfo.getSelectorPrefix() + typeInfo.getFieldName() + " " + typeInfo.getFieldType());
});
this.siddhiAppString
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
new file mode 100644
index 0000000..9c25f15
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.engine.generator;
+
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
+import org.apache.streampipes.wrapper.siddhi.model.EventType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public class SiddhiInvocationConfigGenerator<B extends EventProcessorBindingParams> {
+
+ private final String siddhiAppString;
+ private final String fromStatement;
+ private final String selectStatement;
+
+ private final SiddhiProcessorParams<B> siddhiProcessorParams;
+
+ public SiddhiInvocationConfigGenerator(B params,
+ Function<SiddhiProcessorParams<B>, String> fromStatementFunction,
+ Function<SiddhiProcessorParams<B>, String> selectStatementFunction) {
+ List<String> inputStreamNames = new InputStreamNameGenerator<>(params).generateInputStreamNames();
+ Map<String, List<EventType>> eventTypeInfo = new EventTypeGenerator<>(params).generateEventTypes();
+ List<String> outputEventKeys = new ArrayList<>(params.getOutEventType().keySet());
+ this.siddhiProcessorParams = new SiddhiProcessorParams<>(params, inputStreamNames, eventTypeInfo, outputEventKeys);
+ this.fromStatement = fromStatementFunction.apply(this.siddhiProcessorParams);
+ this.selectStatement = selectStatementFunction.apply(this.siddhiProcessorParams);
+ this.siddhiAppString = new SiddhiAppGenerator<>(siddhiProcessorParams, fromStatement, selectStatement)
+ .generateSiddhiApp();
+ }
+
+ public String getSiddhiAppString() {
+ return siddhiAppString;
+ }
+
+ public SiddhiProcessorParams<B> getSiddhiProcessorParams() {
+ return siddhiProcessorParams;
+ }
+
+ public String getFromStatement() {
+ return fromStatement;
+ }
+
+ public String getSelectStatement() {
+ return selectStatement;
+ }
+}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
index 1264caf..d1e324f 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
@@ -19,26 +19,25 @@ package org.apache.streampipes.wrapper.siddhi.model;
public class EventType {
- private Integer streamIdentifier;
- private String eventTypeName;
- private String eventType;
+ private String selectorPrefix;
+ private String fieldName;
+ private String fieldType;
- public EventType(Integer streamIdentifier, String eventTypeName, String eventType) {
- this.streamIdentifier = streamIdentifier;
- this.eventTypeName = eventTypeName;
- this.eventType = eventType;
+ public EventType(String selectorPrefix, String fieldName, String fieldType) {
+ this.selectorPrefix = selectorPrefix;
+ this.fieldName = fieldName;
+ this.fieldType = fieldType;
}
- public Integer getStreamIdentifier() {
- return streamIdentifier;
+ public String getSelectorPrefix() {
+ return selectorPrefix;
}
- public String getEventTypeName() {
- return eventTypeName;
+ public String getFieldName() {
+ return fieldName;
}
- public String getEventType() {
- return eventType;
+ public String getFieldType() {
+ return fieldType;
}
-
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/SiddhiProcessorParams.java
similarity index 51%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/SiddhiProcessorParams.java
index 585ac5a..0a2ea57 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/SiddhiProcessorParams.java
@@ -15,66 +15,46 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.model;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
-import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
-import org.apache.streampipes.wrapper.siddhi.engine.generator.InputStreamNameGenerator;
-import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.List;
-
-public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> implements
- EventProcessor<B>, SiddhiStatementGenerator<B> {
-
- private static final Logger LOG = LoggerFactory.getLogger(SiddhiEventEngine.class);
-
- private SiddhiEngine siddhiEngine;
- private SiddhiInvocationConfig<B> siddhiConfig;
- private List<String> outputEventKeys = new ArrayList<>();
-
- public SiddhiEventEngine() {
- this.siddhiEngine = new SiddhiEngine();
- }
-
- public SiddhiEventEngine(SiddhiDebugCallback debugCallback) {
- this.siddhiEngine = new SiddhiEngine(debugCallback);
- }
-
- @Override
- public void onInvocation(B parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
- List<String> inputStreamNames = new InputStreamNameGenerator<>(parameters).generateInputStreamNames();
- this.outputEventKeys = new ArrayList<>(parameters.getOutEventType().keySet());
- String fromStatement = fromStatement(inputStreamNames, parameters);
- String selectStatement = selectStatement(outputEventKeys, parameters);
- this.siddhiConfig = new SiddhiInvocationConfig<>(parameters, fromStatement, selectStatement, inputStreamNames);
- this.siddhiEngine.initializeEngine(this.siddhiConfig, spOutputCollector, runtimeContext);
+import java.util.Map;
+
+public class SiddhiProcessorParams<B extends EventProcessorBindingParams> {
+
+ private final B params;
+ private final List<String> inputStreamNames;
+ private final Map<String, List<EventType>> eventTypeInfo;
+ private final List<String> outputEventKeys;
+
+ public SiddhiProcessorParams(B params,
+ List<String> inputStreamNames,
+ Map<String, List<EventType>> eventTypeInfo,
+ List<String> outputEventKeys) {
+ this.params = params;
+ this.inputStreamNames = inputStreamNames;
+ this.eventTypeInfo = eventTypeInfo;
+ this.outputEventKeys = outputEventKeys;
}
- @Override
- public void onEvent(org.apache.streampipes.model.runtime.Event event, SpOutputCollector collector) {
- this.siddhiEngine.processEvent(event);
+ public B getParams() {
+ return params;
}
- @Override
- public void onDetach() {
- this.siddhiEngine.shutdownEngine();
+ public List<String> getInputStreamNames() {
+ return inputStreamNames;
}
- public SiddhiInvocationConfig<B> getSiddhiConfig() {
- return this.siddhiConfig;
+ public Map<String, List<EventType>> getEventTypeInfo() {
+ return eventTypeInfo;
}
- public String prepareName(String name) {
- return SiddhiUtils.prepareName(name);
+ public List<String> getOutputEventKeys() {
+ return outputEventKeys;
}
public String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/AbstractQueryGenerator.java
similarity index 61%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/AbstractQueryGenerator.java
index d7adcb9..c7fe3be 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/AbstractQueryGenerator.java
@@ -15,16 +15,23 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.query;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
-import java.util.List;
+import java.util.Arrays;
+import java.util.StringJoiner;
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
+public abstract class AbstractQueryGenerator {
- String fromStatement(List<String> inputStreamNames, final B bindingParameters);
+ public static String join(String... substrings) {
+ StringJoiner joiner = new StringJoiner(SiddhiConstants.WHITESPACE);
+ Arrays.stream(substrings).forEach(joiner::add);
- String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+ return joiner.toString();
+ }
+ public static String fromString(String query) {
+ return null;
+ }
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/FromClause.java
similarity index 66%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/FromClause.java
index d7adcb9..6667e89 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/FromClause.java
@@ -15,16 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.wrapper.siddhi.engine;
-
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
-
- String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
- String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+package org.apache.streampipes.wrapper.siddhi.query;
+public class FromClause extends AbstractQueryGenerator {
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SelectClause.java
similarity index 64%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SelectClause.java
index d7adcb9..c126880 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SelectClause.java
@@ -15,16 +15,20 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.query;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
+import org.apache.streampipes.wrapper.siddhi.query.expression.Expression;
import java.util.List;
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
+public class SelectClause extends AbstractQueryGenerator {
- String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
- String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+ public static String createWildcard() {
+ return join(SiddhiConstants.SELECT, SiddhiConstants.ASTERISK);
+ }
+ public static String create(List<Expression> outputEventProperties) {
+ return null;
+ }
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SiddhiStatement.java
similarity index 58%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SiddhiStatement.java
index 1264caf..44bfcf5 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/model/EventType.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/SiddhiStatement.java
@@ -15,30 +15,27 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.wrapper.siddhi.model;
+package org.apache.streampipes.wrapper.siddhi.query;
-public class EventType {
+public class SiddhiStatement {
- private Integer streamIdentifier;
- private String eventTypeName;
- private String eventType;
+ private FromClause fromClause;
+ private SelectClause selectClause;
- public EventType(Integer streamIdentifier, String eventTypeName, String eventType) {
- this.streamIdentifier = streamIdentifier;
- this.eventTypeName = eventTypeName;
- this.eventType = eventType;
+ private SiddhiStatement(FromClause fromClause, SelectClause selectClause) {
+ this.fromClause = fromClause;
+ this.selectClause = selectClause;
}
- public Integer getStreamIdentifier() {
- return streamIdentifier;
+ public static SiddhiStatement from(FromClause fromClause, SelectClause selectClause) {
+ return new SiddhiStatement(fromClause, selectClause);
}
- public String getEventTypeName() {
- return eventTypeName;
+ public FromClause getFromClause() {
+ return fromClause;
}
- public String getEventType() {
- return eventType;
+ public SelectClause getSelectClause() {
+ return selectClause;
}
-
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expression.java
similarity index 63%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expression.java
index d7adcb9..aa4d10d 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expression.java
@@ -15,16 +15,21 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.query.expression;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
-import java.util.List;
+import java.util.Arrays;
+import java.util.StringJoiner;
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
+public abstract class Expression {
- String fromStatement(List<String> inputStreamNames, final B bindingParameters);
+ public String join(String... substrings) {
+ StringJoiner joiner = new StringJoiner(SiddhiConstants.WHITESPACE);
+ Arrays.stream(substrings).forEach(joiner::add);
- String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+ return joiner.toString();
+ }
+ public abstract String toSiddhiEpl();
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
similarity index 66%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
index d7adcb9..cf600e1 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
@@ -15,16 +15,16 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.wrapper.siddhi.engine;
+package org.apache.streampipes.wrapper.siddhi.query.expression;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+public class Expressions {
-import java.util.List;
-
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
-
- String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
- String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+ public static Expression property(String propertyName) {
+ //return new PropertyExpression(propertyName);
+ return null;
+ }
+ public static Expression property(String propertyName, String targetName) {
+ return null;
+ }
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
similarity index 66%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
index d7adcb9..ceade46 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
@@ -15,16 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.wrapper.siddhi.engine;
-
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
-
- String fromStatement(List<String> inputStreamNames, final B bindingParameters);
-
- String selectStatement(List<String> outputEventKeys, final B bindingParameters);
+package org.apache.streampipes.wrapper.siddhi.query.expression;
+public abstract class PropertyExpression extends Expression {
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
index 2292f18..b5bc847 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
@@ -18,10 +18,12 @@
package org.apache.streampipes.wrapper.siddhi.utils;
import io.siddhi.core.event.Event;
+import io.siddhi.query.api.definition.Attribute;
import org.apache.streampipes.model.runtime.EventFactory;
import org.apache.streampipes.model.runtime.SchemaInfo;
import org.apache.streampipes.model.runtime.SourceInfo;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
import java.util.HashMap;
import java.util.List;
@@ -29,18 +31,19 @@ import java.util.Map;
public class SiddhiUtils {
- public static org.apache.streampipes.model.runtime.Event toSpEvent(Event event, List<String> outputEventKeys, SchemaInfo
- schemaInfo, SourceInfo sourceInfo) {
+ public static org.apache.streampipes.model.runtime.Event toSpEvent(Event event,
+ SchemaInfo schemaInfo,
+ SourceInfo sourceInfo,
+ List<Attribute> streamAttributes) {
Map<String, Object> outMap = new HashMap<>();
- for (int i = 0; i < outputEventKeys.size(); i++) {
- if (event.getData(i) instanceof List) {
- List<Object> tmp = (List<Object>) event.getData(i);
- outMap.put(outputEventKeys.get(i), tmp.get(0));
- }
- else {
- outMap.put(outputEventKeys.get(i), event.getData(i));
+ for (int i = 0; i < streamAttributes.size(); i++) {
+ String outputKey = streamAttributes.get(i).getName();
+ if (outputKey.startsWith(SiddhiConstants.FIRST_STREAM_PREFIX) ||
+ outputKey.startsWith(SiddhiConstants.SECOND_STREAM_PREFIX)) {
+ outputKey = outputKey.substring(2);
}
+ outMap.put(outputKey, event.getData(i));
}
return EventFactory.fromMap(outMap, sourceInfo, schemaInfo);
}
@@ -54,6 +57,10 @@ public class SiddhiUtils {
return result;
}
+ public static String getPreparedOutputTopicName(EventProcessorBindingParams params) {
+ return prepareName(getOutputTopicName(params));
+ }
+
public static String getOutputTopicName(EventProcessorBindingParams parameters) {
return parameters
.getGraph()