You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/01/09 20:21:12 UTC
[incubator-streampipes] branch STREAMPIPES-272 updated:
[STREAMPIPES-272] Add initial version of StreamPipes client
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-272
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/STREAMPIPES-272 by this push:
new b33e3f4 [STREAMPIPES-272] Add initial version of StreamPipes client
b33e3f4 is described below
commit b33e3f495d8377ded4a3a0b5c9ef6c4162b4a4e2
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sat Jan 9 21:20:37 2021 +0100
[STREAMPIPES-272] Add initial version of StreamPipes client
---
.../all_pipeline_elements_jvm.xml | 10 ++
streampipes-backend/pom.xml | 7 +-
.../backend/StreamPipesResourceConfig.java | 7 +-
streampipes-client/pom.xml | 38 +++++++
.../streampipes/client/StreamPipesClient.java | 105 ++++++++++++++++++
.../streampipes/client/StreamPipesCredentials.java | 28 +++--
.../streampipes/client/api/AbstractClientApi.java | 56 ++++++++++
.../org/apache/streampipes/client/api/CRUDApi.java | 22 ++--
.../apache/streampipes/client/api/DataSinkApi.java | 72 +++++++++++++
.../streampipes/client/api/DataStreamApi.java | 120 +++++++++++++++++++++
.../apache/streampipes/client/api/PipelineApi.java | 32 ++++--
.../client/api/PipelineElementTemplateApi.java | 63 +++++++++++
.../client/api/SupportsDataSinkApi.java | 16 +--
.../client/api/SupportsDataStreamApi.java | 16 +--
.../client/api/SupportsPipelineApi.java | 16 +--
.../api/SupportsPipelineElementTemplateApi.java | 16 +--
.../apache/streampipes/client/http/GetRequest.java | 59 ++++++++++
.../streampipes/client/http/HttpRequest.java | 97 +++++++++++++++++
.../streampipes/client/http/PostRequest.java | 57 ++++++++++
.../http/PostRequestWithPayloadResponse.java | 51 +++++++++
.../http/PostRequestWithoutPayloadResponse.java | 21 ++--
.../streampipes/client/http/header/Headers.java | 47 ++++++++
.../streampipes/client/live/EventProcessor.java | 16 +--
.../client/model/StreamPipesClientConfig.java | 76 +++++++++++++
.../client/util/StreamPipesApiPath.java | 59 ++++++++++
.../streampipes/client/TestStreamPipesClient.java | 77 +++++++++++++
.../extensions/ExtensionsResourceConfig.java | 2 +-
.../PipelineElementContainerResourceConfig.java | 2 +-
...Resource.java => PipelineTemplateResource.java} | 2 +-
.../model/template/PipelineElementTemplate.java | 2 +
.../manager/setup/CouchDbInstallationStep.java | 5 +-
.../template/PipelineElementTemplateVisitor.java | 2 +-
.../ps/PipelineElementTemplateResource.java | 15 ++-
33 files changed, 1095 insertions(+), 119 deletions(-)
diff --git a/.idea/runConfigurations/all_pipeline_elements_jvm.xml b/.idea/runConfigurations/all_pipeline_elements_jvm.xml
index f367750..e454222 100644
--- a/.idea/runConfigurations/all_pipeline_elements_jvm.xml
+++ b/.idea/runConfigurations/all_pipeline_elements_jvm.xml
@@ -15,6 +15,16 @@
</envs>
<option name="MAIN_CLASS_NAME" value="org.apache.streampipes.pe.jvm.AllPipelineElementsInit" />
<module name="streampipes-pipeline-elements-all-jvm" />
+ <extension name="net.ashald.envfile">
+ <option name="IS_ENABLED" value="false" />
+ <option name="IS_SUBST" value="false" />
+ <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+ <option name="IS_IGNORE_MISSING_FILES" value="false" />
+ <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+ <ENTRIES>
+ <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+ </ENTRIES>
+ </extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
diff --git a/streampipes-backend/pom.xml b/streampipes-backend/pom.xml
index ee60bd4..cda7a9b 100644
--- a/streampipes-backend/pom.xml
+++ b/streampipes-backend/pom.xml
@@ -61,6 +61,11 @@
<artifactId>streampipes-connect-container-master</artifactId>
<version>0.68.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-platform-services</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
<!-- External dependencies -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
@@ -103,4 +108,4 @@
</plugins>
<finalName>streampipes-backend</finalName>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index cdb7577..763b73e 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -19,7 +19,7 @@
package org.apache.streampipes.backend;
import org.apache.streampipes.connect.container.master.rest.*;
-import org.apache.streampipes.container.api.PipelineElementTemplateResource;
+import org.apache.streampipes.ps.PipelineElementTemplateResource;
import org.apache.streampipes.rest.impl.*;
import org.apache.streampipes.rest.impl.dashboard.Dashboard;
import org.apache.streampipes.rest.impl.dashboard.DashboardWidget;
@@ -32,7 +32,10 @@ import org.apache.streampipes.rest.impl.nouser.FileServingResource;
import org.apache.streampipes.rest.impl.nouser.PipelineElementImportNoUser;
import org.apache.streampipes.rest.impl.nouser.PipelineNoUserResource;
import org.apache.streampipes.rest.serializer.JsonLdProvider;
-import org.apache.streampipes.rest.shared.serializer.*;
+import org.apache.streampipes.rest.shared.serializer.GsonClientModelProvider;
+import org.apache.streampipes.rest.shared.serializer.GsonWithIdProvider;
+import org.apache.streampipes.rest.shared.serializer.GsonWithoutIdProvider;
+import org.apache.streampipes.rest.shared.serializer.JacksonSerializationProvider;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.springframework.context.annotation.Configuration;
diff --git a/streampipes-client/pom.xml b/streampipes-client/pom.xml
index 22267f0..0ecf90d 100644
--- a/streampipes-client/pom.xml
+++ b/streampipes-client/pom.xml
@@ -11,5 +11,43 @@
<artifactId>streampipes-client</artifactId>
+ <dependencies>
+ <!-- StreamPipes Dependencies -->
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-dataformat-json</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-messaging-kafka</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-model</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-model-client</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-serializers-json</artifactId>
+ <version>0.68.0-SNAPSHOT</version>
+ </dependency>
+
+ <!-- 3rd party Dependencies -->
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>fluent-hc</artifactId>
+ </dependency>
+ </dependencies>
</project>
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
new file mode 100644
index 0000000..6b78413
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client;
+
+import org.apache.streampipes.client.api.*;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.dataformat.SpDataFormatFactory;
+
+public class StreamPipesClient implements SupportsPipelineApi,
+ SupportsPipelineElementTemplateApi,
+ SupportsDataSinkApi,
+ SupportsDataStreamApi {
+
+ private static final Integer SP_DEFAULT_PORT = 80;
+
+ private String streamPipesHost;
+ private Integer streamPipesPort;
+ private boolean httpsDisabled;
+
+ private StreamPipesClientConfig config;
+
+ public static StreamPipesClient create(String streamPipesHost,
+ StreamPipesCredentials credentials,
+ boolean httpsDisabled) {
+ return new StreamPipesClient(streamPipesHost, SP_DEFAULT_PORT, credentials, httpsDisabled);
+ }
+
+ public static StreamPipesClient create(String streamPipesHost,
+ StreamPipesCredentials credentials) {
+ return new StreamPipesClient(streamPipesHost, SP_DEFAULT_PORT, credentials, false);
+ }
+
+ public static StreamPipesClient create(String streamPipesHost,
+ Integer streamPipesPort,
+ StreamPipesCredentials credentials) {
+ return new StreamPipesClient(streamPipesHost, streamPipesPort, credentials, false);
+ }
+
+ public static StreamPipesClient create(String streamPipesHost,
+ Integer streamPipesPort,
+ StreamPipesCredentials credentials,
+ boolean httpsDisabled) {
+ return new StreamPipesClient(streamPipesHost, streamPipesPort, credentials, httpsDisabled);
+ }
+
+ private StreamPipesClient(String streamPipesHost,
+ Integer streamPipesPort,
+ StreamPipesCredentials credentials,
+ boolean httpsDisabled) {
+ this.streamPipesHost = streamPipesHost;
+ this.streamPipesPort = streamPipesPort;
+ this.config = new StreamPipesClientConfig(credentials, streamPipesHost, streamPipesPort, httpsDisabled);
+ }
+
+ public void registerDataFormat(SpDataFormatFactory spDataFormatFactory) {
+ this.config.addDataFormat(spDataFormatFactory);
+ }
+
+ public String getStreamPipesHost() {
+ return streamPipesHost;
+ }
+
+ public Integer getStreamPipesPort() {
+ return streamPipesPort;
+ }
+
+ public StreamPipesCredentials getCredentials() {
+ return config.getCredentials();
+ }
+
+ @Override
+ public PipelineApi pipelines() {
+ return new PipelineApi(config);
+ }
+
+ @Override
+ public PipelineElementTemplateApi pipelineElementTemplates() {
+ return new PipelineElementTemplateApi(config);
+ }
+
+ @Override
+ public DataSinkApi sinks() {
+ return new DataSinkApi(config);
+ }
+
+ @Override
+ public DataStreamApi streams() {
+ return new DataStreamApi(config);
+ }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesCredentials.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesCredentials.java
index 1dfb5f8..5fef5c3 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesCredentials.java
@@ -15,19 +15,27 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client;
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+public class StreamPipesCredentials {
-import javax.ws.rs.Path;
-import java.util.Map;
+ private String username;
+ private String apiKey;
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
+ public static StreamPipesCredentials from(String username, String apiKey) {
+ return new StreamPipesCredentials(username, apiKey);
+ }
+
+ private StreamPipesCredentials(String username, String apiKey) {
+ this.username = username;
+ this.apiKey = apiKey;
+ }
+
+ public String getUsername() {
+ return username;
+ }
- @Override
- protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
- return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
+ public String getApiKey() {
+ return apiKey;
}
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
new file mode 100644
index 0000000..23fde9b
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.http.GetRequest;
+import org.apache.streampipes.client.http.PostRequestWithPayloadResponse;
+import org.apache.streampipes.client.http.PostRequestWithoutPayloadResponse;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
+import java.util.List;
+
+public abstract class AbstractClientApi<T> {
+
+ protected StreamPipesClientConfig clientConfig;
+ private Class<T> targetClass;
+
+ public AbstractClientApi(StreamPipesClientConfig clientConfig, Class<T> targetClass) {
+ this.clientConfig = clientConfig;
+ this.targetClass = targetClass;
+ }
+
+ protected List<T> getAll(StreamPipesApiPath apiPath) throws SpRuntimeException {
+ return new GetRequest<>(clientConfig, apiPath, targetClass).receiveList();
+ }
+
+ protected T getSingle(StreamPipesApiPath apiPath) throws SpRuntimeException {
+ return new GetRequest<>(clientConfig, apiPath, targetClass).receiveSingle();
+ }
+
+ protected <O> O post(StreamPipesApiPath apiPath, T object, Class<O> responseClass) {
+ return new PostRequestWithPayloadResponse<>(clientConfig, apiPath, object, responseClass).postItem();
+ }
+
+ protected void post(StreamPipesApiPath apiPath, T object) {
+ new PostRequestWithoutPayloadResponse<>(clientConfig, apiPath, object).postItem();
+ }
+
+ protected abstract StreamPipesApiPath getBaseResourcePath();
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java
index 1dfb5f8..43b8472 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java
@@ -15,19 +15,19 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.api;
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import java.util.List;
-import javax.ws.rs.Path;
-import java.util.Map;
+public interface CRUDApi<ID, T> {
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
+ T get(ID id);
- @Override
- protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
- return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
- }
+ List<T> all();
+
+ void create(T element);
+
+ void delete(ID id);
+
+ void update(T element);
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
new file mode 100644
index 0000000..85e5538
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+
+import java.util.List;
+
+public class DataSinkApi extends AbstractClientApi<DataSinkInvocation> implements CRUDApi<String, DataSinkInvocation> {
+
+ public DataSinkApi(StreamPipesClientConfig clientConfig) {
+ super(clientConfig, DataSinkInvocation.class);
+ }
+
+ @Override
+ public DataSinkInvocation get(String s) {
+ return null;
+ }
+
+ @Override
+ public List<DataSinkInvocation> all() {
+ return getAll(getBaseResourcePath());
+ }
+
+ @Override
+ public void create(DataSinkInvocation element) {
+
+ }
+
+ @Override
+ public void delete(String s) {
+
+ }
+
+ @Override
+ public void update(DataSinkInvocation element) {
+
+ }
+
+ public DataSinkInvocation getDataSinkForPipelineElement(String templateId, DataSinkInvocation pipelineElement) {
+ StreamPipesApiPath path = StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
+ .addToPath("pipeline-element-templates")
+ .addToPath(templateId)
+ .addToPath("sink");
+
+ return post(path, pipelineElement, DataSinkInvocation.class);
+ }
+
+ @Override
+ protected StreamPipesApiPath getBaseResourcePath() {
+ return StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
+ .addToPath("actions")
+ .addToPath("own");
+ }
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java
new file mode 100644
index 0000000..dbcec49
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.live.EventProcessor;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.dataformat.SpDataFormatFactory;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+public class DataStreamApi extends AbstractClientApi<SpDataStream> implements CRUDApi<String, SpDataStream> {
+
+ public DataStreamApi(StreamPipesClientConfig clientConfig) {
+ super(clientConfig, SpDataStream.class);
+ }
+
+ @Override
+ public SpDataStream get(String s) {
+ return null;
+ }
+
+ @Override
+ public List<SpDataStream> all() {
+ return getAll(getBaseResourcePath());
+ }
+
+ @Override
+ public void create(SpDataStream element) {
+
+ }
+
+ @Override
+ public void delete(String s) {
+
+ }
+
+ @Override
+ public void update(SpDataStream element) {
+
+ }
+
+ public void subscribe(SpDataStream stream, EventProcessor callback) {
+ Optional<SpDataFormatFactory> formatConverterOpt = this
+ .clientConfig
+ .getRegisteredDataFormats()
+ .stream()
+ .filter(format -> stream
+ .getEventGrounding()
+ .getTransportFormats()
+ .get(0)
+ .getRdfType()
+ .stream()
+ .anyMatch(tf -> tf.toString().equals(format.getTransportFormatRdfUri())))
+ .findFirst();
+
+ if (formatConverterOpt.isPresent()) {
+ final SpDataFormatDefinition converter = formatConverterOpt.get().createInstance();
+ SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(getKafkaProtocol(stream), getOutputTopic(stream), new InternalEventProcessor<byte[]>() {
+ @Override
+ public void onEvent(byte[] event) {
+ try {
+ Event spEvent = EventFactory.fromMap(converter.toMap(event));
+ callback.onEvent(spEvent);
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ Thread t = new Thread(kafkaConsumer);
+ t.start();
+
+ } else {
+ throw new SpRuntimeException("No converter found for data format - did you add a format factory (client.registerDataFormat)?");
+ }
+ }
+
+ private KafkaTransportProtocol getKafkaProtocol(SpDataStream stream) {
+ return (KafkaTransportProtocol) stream.getEventGrounding().getTransportProtocol();
+ }
+
+ private String getOutputTopic(SpDataStream spDataStream) {
+ return spDataStream
+ .getEventGrounding()
+ .getTransportProtocol()
+ .getTopicDefinition()
+ .getActualTopicName();
+ }
+
+ @Override
+ protected StreamPipesApiPath getBaseResourcePath() {
+ return StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
+ .addToPath("streams")
+ .addToPath("own");
+ }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
similarity index 50%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
index 1dfb5f8..fe80ae3 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
@@ -15,19 +15,33 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.api;
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.pipeline.Pipeline;
-import javax.ws.rs.Path;
-import java.util.Map;
+import java.util.List;
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
+public class PipelineApi extends AbstractClientApi<Pipeline> {
+
+ public PipelineApi(StreamPipesClientConfig clientConfig) {
+ super(clientConfig, Pipeline.class);
+ }
+
+ public List<Pipeline> all() {
+ return getAll(getBaseResourcePath());
+ }
+
+ public Message start(String pipelineId) {
+ return null;
+ }
@Override
- protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
- return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
+ protected StreamPipesApiPath getBaseResourcePath() {
+ return StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
+ .addToPath("pipelines")
+ .addToPath("own");
}
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java
new file mode 100644
index 0000000..9191b3e
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.model.template.PipelineElementTemplate;
+
+import java.util.List;
+
+public class PipelineElementTemplateApi extends AbstractClientApi<PipelineElementTemplate>
+ implements CRUDApi<String, PipelineElementTemplate> {
+
+ public PipelineElementTemplateApi(StreamPipesClientConfig clientConfig) {
+ super(clientConfig, PipelineElementTemplate.class);
+ }
+
+ @Override
+ public PipelineElementTemplate get(String id) {
+ return getSingle(getBaseResourcePath().addToPath(id));
+ }
+
+ @Override
+ public List<PipelineElementTemplate> all() {
+ return getAll(getBaseResourcePath());
+ }
+
+ @Override
+ public void create(PipelineElementTemplate element) {
+ post(getBaseResourcePath(), element);
+ }
+
+ @Override
+ public void delete(String s) {
+
+ }
+
+ @Override
+ public void update(PipelineElementTemplate element) {
+
+ }
+
+ @Override
+ protected StreamPipesApiPath getBaseResourcePath() {
+ return StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
+ .addToPath("pipeline-element-templates");
+ }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataSinkApi.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataSinkApi.java
index 1dfb5f8..7eb3d9e 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataSinkApi.java
@@ -15,19 +15,9 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.api;
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+public interface SupportsDataSinkApi {
-import javax.ws.rs.Path;
-import java.util.Map;
-
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
-
- @Override
- protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
- return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
- }
+ DataSinkApi sinks();
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataStreamApi.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataStreamApi.java
index 1dfb5f8..e4ff907 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataStreamApi.java
@@ -15,19 +15,9 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.api;
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+public interface SupportsDataStreamApi {
-import javax.ws.rs.Path;
-import java.util.Map;
-
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
-
- @Override
- protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
- return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
- }
+ DataStreamApi streams();
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineApi.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineApi.java
index 1dfb5f8..7daf809 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineApi.java
@@ -15,19 +15,9 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.api;
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+public interface SupportsPipelineApi {
-import javax.ws.rs.Path;
-import java.util.Map;
-
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
-
- @Override
- protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
- return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
- }
+ PipelineApi pipelines();
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineElementTemplateApi.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineElementTemplateApi.java
index 1dfb5f8..a9338a6 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineElementTemplateApi.java
@@ -15,19 +15,9 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.api;
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+public interface SupportsPipelineElementTemplateApi {
-import javax.ws.rs.Path;
-import java.util.Map;
-
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
-
- @Override
- protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
- return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
- }
+ PipelineElementTemplateApi pipelineElementTemplates();
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/GetRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/GetRequest.java
new file mode 100644
index 0000000..47f936d
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/GetRequest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
+import java.io.IOException;
+import java.util.List;
+
+public class GetRequest<T> extends HttpRequest {
+
+ private Class<T> targetClass;
+
+ public GetRequest(StreamPipesClientConfig clientConfig,
+ StreamPipesApiPath apiPath,
+ Class<T> targetClass) {
+ super(clientConfig, apiPath);
+ this.targetClass = targetClass;
+ }
+
+ public T receiveSingle() throws SpRuntimeException {
+ return deserialize(executeRequest(), targetClass);
+ }
+
+ public List<T> receiveList() throws SpRuntimeException {
+ return deserializeList(executeRequest(), targetClass);
+ }
+
+ private String executeRequest() throws SpRuntimeException {
+ try {
+ return Request
+ .Get(makeUrl())
+ .setHeaders(standardHeaders())
+ .execute()
+ .returnContent()
+ .asString();
+ } catch (IOException e) {
+ throw new SpRuntimeException(e.getMessage());
+ }
+ }
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
new file mode 100644
index 0000000..f4ed4f7
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import org.apache.http.Header;
+import org.apache.streampipes.client.http.header.Headers;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class HttpRequest {
+
+ private StreamPipesClientConfig clientConfig;
+ private StreamPipesApiPath apiPath;
+ private ObjectMapper objectMapper;
+
+ public HttpRequest(StreamPipesClientConfig clientConfig, StreamPipesApiPath apiPath) {
+ this.clientConfig = clientConfig;
+ this.objectMapper = clientConfig.getSerializer();
+ this.apiPath = apiPath;
+ }
+
+ protected Header[] standardHeaders() {
+ List<Header> headers = new ArrayList<>();
+ headers.add(Headers.auth(clientConfig.getCredentials()));
+ headers.add(Headers.acceptJson());
+ return headers.toArray(new Header[0]);
+ }
+
+ protected Header[] standardPostHeaders() {
+ List<Header> headers = new ArrayList<>(Arrays.asList(standardHeaders()));
+ headers.add(Headers.contentTypeJson());
+ return headers.toArray(new Header[0]);
+ }
+
+ protected <T> String serialize(T element) {
+ try {
+ return objectMapper.writeValueAsString(element);
+ } catch (JsonProcessingException e) {
+ throw new SpRuntimeException(e.getCause());
+ }
+ }
+
+ protected <T> T deserialize(String json, Class<T> targetClass) throws SpRuntimeException {
+ try {
+ return objectMapper.readValue(json, targetClass);
+ } catch (JsonProcessingException e) {
+ throw new SpRuntimeException(e.getCause());
+ }
+ }
+
+ protected <T> List<T> deserializeList(String json, Class<T> targetClass) throws SpRuntimeException {
+ CollectionType listType = objectMapper.getTypeFactory()
+ .constructCollectionType(List.class, targetClass);
+ try {
+ return objectMapper.readValue(json, listType);
+ } catch (JsonProcessingException e) {
+ throw new SpRuntimeException(e.getCause());
+ }
+ }
+
+ protected String makeUrl() {
+ return makeProtocol() + clientConfig.getStreamPipesHost()
+ + ":"
+ + clientConfig.getStreamPipesPort()
+ + "/"
+ + apiPath.toString();
+ }
+
+ private String makeProtocol() {
+ return clientConfig.isHttpsDisabled() ? "http://" : "https://";
+ }
+
+
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequest.java
new file mode 100644
index 0000000..3f9d489
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
+import java.io.IOException;
+
+public abstract class PostRequest<T, O> extends HttpRequest {
+
+ protected T element;
+
+ public PostRequest(StreamPipesClientConfig clientConfig,
+ StreamPipesApiPath apiPath,
+ T element) {
+ super(clientConfig, apiPath);
+ this.element = element;
+ }
+
+ public abstract O postItem();
+
+ protected Response executeRequest() throws SpRuntimeException {
+ try {
+ return Request
+ .Post(makeUrl())
+ .setHeaders(standardPostHeaders())
+ .bodyString(serialize(element), ContentType.APPLICATION_JSON)
+ .execute();
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+
+
+
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequestWithPayloadResponse.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequestWithPayloadResponse.java
new file mode 100644
index 0000000..3b68102
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequestWithPayloadResponse.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http;
+
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
+import java.io.IOException;
+
+public class PostRequestWithPayloadResponse<T, O> extends PostRequest<T, O> {
+
+ private Class<O> responseClass;
+
+ public PostRequestWithPayloadResponse(StreamPipesClientConfig clientConfig,
+ StreamPipesApiPath apiPath,
+ T element,
+ Class<O> responseClass) {
+ super(clientConfig, apiPath, element);
+ this.responseClass = responseClass;
+ }
+
+ public O postItem() throws SpRuntimeException {
+ return deserialize(executeAndReturnContent(), responseClass);
+ }
+
+ private String executeAndReturnContent() {
+ try {
+ return executeRequest().returnContent().asString();
+ } catch (IOException e) {
+ throw new SpRuntimeException(e.getMessage());
+ }
+ }
+
+
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequestWithoutPayloadResponse.java
similarity index 58%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequestWithoutPayloadResponse.java
index 1dfb5f8..51f54a7 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequestWithoutPayloadResponse.java
@@ -15,19 +15,22 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.http;
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
-import javax.ws.rs.Path;
-import java.util.Map;
+public class PostRequestWithoutPayloadResponse<T> extends PostRequest<T, Void> {
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
+ public PostRequestWithoutPayloadResponse(StreamPipesClientConfig clientConfig,
+ StreamPipesApiPath apiPath,
+ T element) {
+ super(clientConfig, apiPath, element);
+ }
@Override
- protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
- return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
+ public Void postItem() {
+ executeRequest();
+ return null;
}
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/header/Headers.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/header/Headers.java
new file mode 100644
index 0000000..58709df
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/header/Headers.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http.header;
+
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.apache.streampipes.client.StreamPipesCredentials;
+
+public class Headers {
+
+ private static final String AUTHORIZATION = "Authorization";
+ private static final String ACCEPT = "Accept";
+ private static final String CONTENT_TYPE = "Content-type";
+ private static final String BEARER = "Bearer ";
+ private static final String APPLICATION_JSON_TYPE = "application/json";
+
+ public static Header auth(StreamPipesCredentials credentials) {
+ return makeHeader(AUTHORIZATION, BEARER + credentials.getApiKey());
+ }
+
+ public static Header acceptJson() {
+ return makeHeader(ACCEPT, APPLICATION_JSON_TYPE);
+ }
+
+ private static Header makeHeader(String name, String value) {
+ return new BasicHeader(name, value);
+ }
+
+ public static Header contentTypeJson() {
+ return makeHeader(CONTENT_TYPE, APPLICATION_JSON_TYPE);
+ }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/live/EventProcessor.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/live/EventProcessor.java
index 1dfb5f8..2af4cb9 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/live/EventProcessor.java
@@ -15,19 +15,11 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.live;
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.model.runtime.Event;
-import javax.ws.rs.Path;
-import java.util.Map;
+public interface EventProcessor {
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
-
- @Override
- protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
- return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
- }
+ void onEvent(Event event);
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java b/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java
new file mode 100644
index 0000000..b587f0d
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.model;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.dataformat.SpDataFormatFactory;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamPipesClientConfig {
+
+ private StreamPipesCredentials credentials;
+ private String streamPipesHost;
+ private Integer streamPipesPort;
+ private ObjectMapper serializer;
+ private boolean httpsDisabled;
+ private List<SpDataFormatFactory> registeredDataFormats;
+
+ public StreamPipesClientConfig(StreamPipesCredentials credentials,
+ String streamPipesHost,
+ Integer streamPipesPort,
+ boolean httpsDisabled) {
+ this.credentials = credentials;
+ this.streamPipesHost = streamPipesHost;
+ this.streamPipesPort = streamPipesPort;
+ this.httpsDisabled = httpsDisabled;
+ this.serializer = JacksonSerializer.getObjectMapper();
+ this.registeredDataFormats = new ArrayList<>();
+ }
+
+ public StreamPipesCredentials getCredentials() {
+ return credentials;
+ }
+
+ public String getStreamPipesHost() {
+ return streamPipesHost;
+ }
+
+ public Integer getStreamPipesPort() {
+ return streamPipesPort;
+ }
+
+ public ObjectMapper getSerializer() {
+ return serializer;
+ }
+
+ public boolean isHttpsDisabled() {
+ return httpsDisabled;
+ }
+
+ public void addDataFormat(SpDataFormatFactory spDataFormatFactory) {
+ this.registeredDataFormats.add(spDataFormatFactory);
+ }
+
+ public List<SpDataFormatFactory> getRegisteredDataFormats() {
+ return registeredDataFormats;
+ }
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/util/StreamPipesApiPath.java b/streampipes-client/src/main/java/org/apache/streampipes/client/util/StreamPipesApiPath.java
new file mode 100644
index 0000000..c3764af
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/util/StreamPipesApiPath.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.util;
+
+import org.apache.streampipes.client.StreamPipesCredentials;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.StringJoiner;
+
+public class StreamPipesApiPath {
+
+ private List<String> pathItems;
+ private static final List<String> BaseApiPathV2 = Arrays.asList("streampipes-backend", "api", "v2");
+
+ public static StreamPipesApiPath fromBaseApiPath() {
+ return new StreamPipesApiPath(BaseApiPathV2);
+ }
+
+ public static StreamPipesApiPath fromUserApiPath(StreamPipesCredentials credentials) {
+ List<String> authPath = new ArrayList<>();
+ authPath.addAll(BaseApiPathV2);
+ authPath.addAll(Arrays.asList("users", credentials.getUsername()));
+ return new StreamPipesApiPath(authPath);
+ }
+
+
+ private StreamPipesApiPath(List<String> initialPathItems) {
+ this.pathItems = initialPathItems;
+ }
+
+ public StreamPipesApiPath addToPath(String pathItem) {
+ this.pathItems.add(pathItem);
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ StringJoiner joiner = new StringJoiner("/");
+ pathItems.forEach(joiner::add);
+ return joiner.toString();
+ }
+}
diff --git a/streampipes-client/src/test/java/org/apache/streampipes/client/TestStreamPipesClient.java b/streampipes-client/src/test/java/org/apache/streampipes/client/TestStreamPipesClient.java
new file mode 100644
index 0000000..74e622e
--- /dev/null
+++ b/streampipes-client/src/test/java/org/apache/streampipes/client/TestStreamPipesClient.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.template.PipelineElementTemplate;
+import org.apache.streampipes.model.template.PipelineElementTemplateConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestStreamPipesClient {
+
+ public static void main(String[] args) {
+ StreamPipesCredentials credentials = StreamPipesCredentials
+ .from(System.getenv("user"), System.getenv("apiKey"));
+
+ StreamPipesClient client = StreamPipesClient
+ .create("localhost", 8082, credentials, true);
+
+ List<Pipeline> pipelines = client.pipelines().all();
+
+ pipelines.forEach(pipeline -> System.out.println(pipeline.getName()));
+
+ PipelineElementTemplate template = new PipelineElementTemplate();
+ template.setTemplateName("Test");
+ template.setTemplateDescription("description");
+ template.setBasePipelineElementAppId("org.apache.streampipes.sinks.internal.jvm.dashboard");
+ Map<String, PipelineElementTemplateConfig> configs = new HashMap<>();
+ PipelineElementTemplateConfig config = new PipelineElementTemplateConfig();
+ config.setValue("test");
+ configs.put("visualization-name", config);
+ template.setTemplateConfigs(configs);
+
+ //client.pipelineElementTemplates().create(template);
+
+ List<PipelineElementTemplate> templates = client.pipelineElementTemplates().all();
+
+ templates.forEach(t -> System.out.println(t.getTemplateName()));
+
+ List<DataSinkInvocation> dataSinks = client.sinks().all();
+
+// System.out.println(dataSinks.size());
+// System.out.println(template.getCouchDbId());
+// DataSinkInvocation invocation = client.sinks().getDataSinkForPipelineElement(templates.get(0).getCouchDbId(), dataSinks.get(0));
+// System.out.println(invocation.getName());
+
+ List<SpDataStream> dataStreams = client.streams().all();
+ System.out.println(dataStreams.size());
+
+ //SpDataStream myStream = dataStreams.stream().filter(stream -> stream.getCorrespondingAdapterId().equals("org.apache.streampipes.connect.adapters.simulator.randomdatastream")).findFirst().get();
+
+ client.registerDataFormat(new JsonDataFormatFactory());
+
+ client.streams().subscribe(dataStreams.get(1), event -> MapUtils.debugPrint(System.out, "event", event.getRaw()));
+ }
+}
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
index 7a267a7..cd27658 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
@@ -31,7 +31,7 @@ public class ExtensionsResourceConfig extends ResourceConfig {
register(DataProcessorPipelineElementResource.class);
register(DataStreamPipelineElementResource.class);
register(WelcomePage.class);
- register(PipelineElementTemplateResource.class);
+ register(PipelineTemplateResource.class);
//register(WelcomePageWorker.class);
register(GuessResource.class);
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java
index 920c11d..1ef0379 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java
@@ -30,7 +30,7 @@ public class PipelineElementContainerResourceConfig extends ResourceConfig {
register(DataProcessorPipelineElementResource.class);
register(DataStreamPipelineElementResource.class);
register(WelcomePage.class);
- register(PipelineElementTemplateResource.class);
+ register(PipelineTemplateResource.class);
register(JacksonSerializationProvider.class);
}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineTemplateResource.java
similarity index 91%
rename from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
rename to streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineTemplateResource.java
index 1dfb5f8..0a930ae 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineTemplateResource.java
@@ -24,7 +24,7 @@ import javax.ws.rs.Path;
import java.util.Map;
@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
+public class PipelineTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
@Override
protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineElementTemplate.java b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineElementTemplate.java
index 8a5507f..96c1078 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineElementTemplate.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineElementTemplate.java
@@ -22,6 +22,7 @@ import com.google.gson.annotations.SerializedName;
import org.apache.streampipes.model.shared.annotation.TsModel;
import java.util.Map;
+import java.util.UUID;
@TsModel
public class PipelineElementTemplate {
@@ -47,6 +48,7 @@ public class PipelineElementTemplate {
}
public PipelineElementTemplate() {
+ this.couchDbId = UUID.randomUUID().toString();
}
public String getTemplateName() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
index d522345..a07e50c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.manager.setup;
-import org.apache.streampipes.container.util.Util;
import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.Notifications;
@@ -154,8 +153,12 @@ public class CouchDbInstallationStep implements InstallationStep {
MapReduce usernameFunction = new MapReduce();
usernameFunction.setMap("function(doc) { if(doc.email) { emit(doc.email, doc); } }");
+ MapReduce tokenFunction = new MapReduce();
+ tokenFunction.setMap("function(doc) { if (doc.userApiTokens) { doc.userApiTokens.forEach(function(token) { emit(token.hashedToken, doc.email); });}}");
+
views.put("password", passwordFunction);
views.put("username", usernameFunction);
+ views.put("token", tokenFunction);
userDocument.setViews(views);
Response resp = Utils.getCouchDbUserClient().design().synchronizeWithDb(userDocument);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
index 4cea7b4..fc55d28 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
@@ -114,7 +114,7 @@ public class PipelineElementTemplateVisitor implements StaticPropertyVisitor {
}
private Object getValue(String internalName) {
- return configs.get(internalName);
+ return configs.get(internalName).getValue();
}
private boolean hasKey(String internalName) {
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/PipelineElementTemplateResource.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/PipelineElementTemplateResource.java
index a922f37..4b86018 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/PipelineElementTemplateResource.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/PipelineElementTemplateResource.java
@@ -24,7 +24,6 @@ import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.template.PipelineElementTemplate;
import org.apache.streampipes.rest.impl.AbstractRestInterface;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
-import retrofit2.http.Query;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
@@ -37,7 +36,7 @@ public class PipelineElementTemplateResource extends AbstractRestInterface {
@Produces(MediaType.APPLICATION_JSON)
@JacksonSerialized
public Response getAll(@QueryParam("appId") String appId) {
- if (appId != null) {
+ if (appId == null) {
return ok(getPipelineElementTemplateStorage().getAll());
} else {
return ok(getPipelineElementTemplateStorage().getPipelineElementTemplatesforAppId(appId));
@@ -56,7 +55,7 @@ public class PipelineElementTemplateResource extends AbstractRestInterface {
}
}
- @GET
+ @POST
@Consumes(MediaType.APPLICATION_JSON)
@JacksonSerialized
public Response create(PipelineElementTemplate entity) {
@@ -95,23 +94,23 @@ public class PipelineElementTemplateResource extends AbstractRestInterface {
@Consumes(MediaType.APPLICATION_JSON)
@JacksonSerialized
public Response getPipelineElementForTemplate(@PathParam("id") String id,
- @QueryParam("overwriteNames") boolean overwriteNameAndDescription,
+ @QueryParam("overwriteNames") String overwriteNameAndDescription,
DataSinkInvocation invocation) {
PipelineElementTemplate template = getPipelineElementTemplateStorage().getElementById(id);
- return ok(new DataSinkTemplateHandler(template, invocation, overwriteNameAndDescription)
+ return ok(new DataSinkTemplateHandler(template, invocation, Boolean.parseBoolean(overwriteNameAndDescription))
.applyTemplateOnPipelineElement());
}
@POST
- @Path("{id}/sink")
+ @Path("{id}/processor")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@JacksonSerialized
public Response getPipelineElementForTemplate(@PathParam("id") String id,
- @Query("overwriteNames") boolean overwriteNameAndDescription,
+ @QueryParam("overwriteNames") String overwriteNameAndDescription,
DataProcessorInvocation invocation) {
PipelineElementTemplate template = getPipelineElementTemplateStorage().getElementById(id);
- return ok(new DataProcessorTemplateHandler(template, invocation, overwriteNameAndDescription)
+ return ok(new DataProcessorTemplateHandler(template, invocation, Boolean.parseBoolean(overwriteNameAndDescription))
.applyTemplateOnPipelineElement());
}