You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/01/09 20:21:12 UTC

[incubator-streampipes] branch STREAMPIPES-272 updated: [STREAMPIPES-272] Add initial version of StreamPipes client

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-272
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/STREAMPIPES-272 by this push:
     new b33e3f4  [STREAMPIPES-272] Add initial version of StreamPipes client
b33e3f4 is described below

commit b33e3f495d8377ded4a3a0b5c9ef6c4162b4a4e2
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sat Jan 9 21:20:37 2021 +0100

    [STREAMPIPES-272] Add initial version of StreamPipes client
---
 .../all_pipeline_elements_jvm.xml                  |  10 ++
 streampipes-backend/pom.xml                        |   7 +-
 .../backend/StreamPipesResourceConfig.java         |   7 +-
 streampipes-client/pom.xml                         |  38 +++++++
 .../streampipes/client/StreamPipesClient.java      | 105 ++++++++++++++++++
 .../streampipes/client/StreamPipesCredentials.java |  28 +++--
 .../streampipes/client/api/AbstractClientApi.java  |  56 ++++++++++
 .../org/apache/streampipes/client/api/CRUDApi.java |  22 ++--
 .../apache/streampipes/client/api/DataSinkApi.java |  72 +++++++++++++
 .../streampipes/client/api/DataStreamApi.java      | 120 +++++++++++++++++++++
 .../apache/streampipes/client/api/PipelineApi.java |  32 ++++--
 .../client/api/PipelineElementTemplateApi.java     |  63 +++++++++++
 .../client/api/SupportsDataSinkApi.java            |  16 +--
 .../client/api/SupportsDataStreamApi.java          |  16 +--
 .../client/api/SupportsPipelineApi.java            |  16 +--
 .../api/SupportsPipelineElementTemplateApi.java    |  16 +--
 .../apache/streampipes/client/http/GetRequest.java |  59 ++++++++++
 .../streampipes/client/http/HttpRequest.java       |  97 +++++++++++++++++
 .../streampipes/client/http/PostRequest.java       |  57 ++++++++++
 .../http/PostRequestWithPayloadResponse.java       |  51 +++++++++
 .../http/PostRequestWithoutPayloadResponse.java    |  21 ++--
 .../streampipes/client/http/header/Headers.java    |  47 ++++++++
 .../streampipes/client/live/EventProcessor.java    |  16 +--
 .../client/model/StreamPipesClientConfig.java      |  76 +++++++++++++
 .../client/util/StreamPipesApiPath.java            |  59 ++++++++++
 .../streampipes/client/TestStreamPipesClient.java  |  77 +++++++++++++
 .../extensions/ExtensionsResourceConfig.java       |   2 +-
 .../PipelineElementContainerResourceConfig.java    |   2 +-
 ...Resource.java => PipelineTemplateResource.java} |   2 +-
 .../model/template/PipelineElementTemplate.java    |   2 +
 .../manager/setup/CouchDbInstallationStep.java     |   5 +-
 .../template/PipelineElementTemplateVisitor.java   |   2 +-
 .../ps/PipelineElementTemplateResource.java        |  15 ++-
 33 files changed, 1095 insertions(+), 119 deletions(-)

diff --git a/.idea/runConfigurations/all_pipeline_elements_jvm.xml b/.idea/runConfigurations/all_pipeline_elements_jvm.xml
index f367750..e454222 100644
--- a/.idea/runConfigurations/all_pipeline_elements_jvm.xml
+++ b/.idea/runConfigurations/all_pipeline_elements_jvm.xml
@@ -15,6 +15,16 @@
     </envs>
     <option name="MAIN_CLASS_NAME" value="org.apache.streampipes.pe.jvm.AllPipelineElementsInit" />
     <module name="streampipes-pipeline-elements-all-jvm" />
+    <extension name="net.ashald.envfile">
+      <option name="IS_ENABLED" value="false" />
+      <option name="IS_SUBST" value="false" />
+      <option name="IS_PATH_MACRO_SUPPORTED" value="false" />
+      <option name="IS_IGNORE_MISSING_FILES" value="false" />
+      <option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
+      <ENTRIES>
+        <ENTRY IS_ENABLED="true" PARSER="runconfig" />
+      </ENTRIES>
+    </extension>
     <method v="2">
       <option name="Make" enabled="true" />
     </method>
diff --git a/streampipes-backend/pom.xml b/streampipes-backend/pom.xml
index ee60bd4..cda7a9b 100644
--- a/streampipes-backend/pom.xml
+++ b/streampipes-backend/pom.xml
@@ -61,6 +61,11 @@
             <artifactId>streampipes-connect-container-master</artifactId>
             <version>0.68.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-platform-services</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
         <!-- External dependencies -->
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
@@ -103,4 +108,4 @@
         </plugins>
         <finalName>streampipes-backend</finalName>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index cdb7577..763b73e 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -19,7 +19,7 @@
 package org.apache.streampipes.backend;
 
 import org.apache.streampipes.connect.container.master.rest.*;
-import org.apache.streampipes.container.api.PipelineElementTemplateResource;
+import org.apache.streampipes.ps.PipelineElementTemplateResource;
 import org.apache.streampipes.rest.impl.*;
 import org.apache.streampipes.rest.impl.dashboard.Dashboard;
 import org.apache.streampipes.rest.impl.dashboard.DashboardWidget;
@@ -32,7 +32,10 @@ import org.apache.streampipes.rest.impl.nouser.FileServingResource;
 import org.apache.streampipes.rest.impl.nouser.PipelineElementImportNoUser;
 import org.apache.streampipes.rest.impl.nouser.PipelineNoUserResource;
 import org.apache.streampipes.rest.serializer.JsonLdProvider;
-import org.apache.streampipes.rest.shared.serializer.*;
+import org.apache.streampipes.rest.shared.serializer.GsonClientModelProvider;
+import org.apache.streampipes.rest.shared.serializer.GsonWithIdProvider;
+import org.apache.streampipes.rest.shared.serializer.GsonWithoutIdProvider;
+import org.apache.streampipes.rest.shared.serializer.JacksonSerializationProvider;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.springframework.context.annotation.Configuration;
diff --git a/streampipes-client/pom.xml b/streampipes-client/pom.xml
index 22267f0..0ecf90d 100644
--- a/streampipes-client/pom.xml
+++ b/streampipes-client/pom.xml
@@ -11,5 +11,43 @@
 
     <artifactId>streampipes-client</artifactId>
 
+    <dependencies>
+        <!-- StreamPipes Dependencies -->
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-dataformat-json</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-messaging-kafka</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-model</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-model-client</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-serializers-json</artifactId>
+            <version>0.68.0-SNAPSHOT</version>
+        </dependency>
+
+        <!-- 3rd party Dependencies -->
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>fluent-hc</artifactId>
+        </dependency>
+    </dependencies>
 
 </project>
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
new file mode 100644
index 0000000..6b78413
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client;
+
+import org.apache.streampipes.client.api.*;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.dataformat.SpDataFormatFactory;
+
+public class StreamPipesClient implements SupportsPipelineApi,
+        SupportsPipelineElementTemplateApi,
+        SupportsDataSinkApi,
+        SupportsDataStreamApi {
+
+  private static final Integer SP_DEFAULT_PORT = 80;
+
+  private String streamPipesHost;
+  private Integer streamPipesPort;
+  private boolean httpsDisabled;
+
+  private StreamPipesClientConfig config;
+
+  public static StreamPipesClient create(String streamPipesHost,
+                                         StreamPipesCredentials credentials,
+                                         boolean httpsDisabled) {
+    return new StreamPipesClient(streamPipesHost, SP_DEFAULT_PORT, credentials, httpsDisabled);
+  }
+
+  public static StreamPipesClient create(String streamPipesHost,
+                                         StreamPipesCredentials credentials) {
+    return new StreamPipesClient(streamPipesHost, SP_DEFAULT_PORT, credentials, false);
+  }
+
+  public static StreamPipesClient create(String streamPipesHost,
+                                         Integer streamPipesPort,
+                                         StreamPipesCredentials credentials) {
+    return new StreamPipesClient(streamPipesHost, streamPipesPort, credentials, false);
+  }
+
+  public static StreamPipesClient create(String streamPipesHost,
+                                         Integer streamPipesPort,
+                                         StreamPipesCredentials credentials,
+                                         boolean httpsDisabled) {
+    return new StreamPipesClient(streamPipesHost, streamPipesPort, credentials, httpsDisabled);
+  }
+
+  private StreamPipesClient(String streamPipesHost,
+                            Integer streamPipesPort,
+                            StreamPipesCredentials credentials,
+                            boolean httpsDisabled) {
+    this.streamPipesHost = streamPipesHost;
+    this.streamPipesPort = streamPipesPort;
+    this.config = new StreamPipesClientConfig(credentials, streamPipesHost, streamPipesPort, httpsDisabled);
+  }
+
+  public void registerDataFormat(SpDataFormatFactory spDataFormatFactory) {
+    this.config.addDataFormat(spDataFormatFactory);
+  }
+
+  public String getStreamPipesHost() {
+    return streamPipesHost;
+  }
+
+  public Integer getStreamPipesPort() {
+    return streamPipesPort;
+  }
+
+  public StreamPipesCredentials getCredentials() {
+    return config.getCredentials();
+  }
+
+  @Override
+  public PipelineApi pipelines() {
+    return new PipelineApi(config);
+  }
+
+  @Override
+  public PipelineElementTemplateApi pipelineElementTemplates() {
+    return new PipelineElementTemplateApi(config);
+  }
+
+  @Override
+  public DataSinkApi sinks() {
+    return new DataSinkApi(config);
+  }
+
+  @Override
+  public DataStreamApi streams() {
+    return new DataStreamApi(config);
+  }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesCredentials.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesCredentials.java
index 1dfb5f8..5fef5c3 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesCredentials.java
@@ -15,19 +15,27 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client;
 
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+public class StreamPipesCredentials {
 
-import javax.ws.rs.Path;
-import java.util.Map;
+  private String username;
+  private String apiKey;
 
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
+  public static StreamPipesCredentials from(String username, String apiKey) {
+    return new StreamPipesCredentials(username, apiKey);
+  }
+
+  private StreamPipesCredentials(String username, String apiKey) {
+    this.username = username;
+    this.apiKey = apiKey;
+  }
+
+  public String getUsername() {
+    return username;
+  }
 
-  @Override
-  protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
-    return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
+  public String getApiKey() {
+    return apiKey;
   }
 }
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
new file mode 100644
index 0000000..23fde9b
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.http.GetRequest;
+import org.apache.streampipes.client.http.PostRequestWithPayloadResponse;
+import org.apache.streampipes.client.http.PostRequestWithoutPayloadResponse;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
+import java.util.List;
+
+public abstract class AbstractClientApi<T> {
+
+  protected StreamPipesClientConfig clientConfig;
+  private Class<T> targetClass;
+
+  public AbstractClientApi(StreamPipesClientConfig clientConfig, Class<T> targetClass) {
+    this.clientConfig = clientConfig;
+    this.targetClass = targetClass;
+  }
+
+  protected List<T> getAll(StreamPipesApiPath apiPath) throws SpRuntimeException {
+    return new GetRequest<>(clientConfig, apiPath, targetClass).receiveList();
+  }
+
+  protected T getSingle(StreamPipesApiPath apiPath) throws SpRuntimeException {
+    return new GetRequest<>(clientConfig, apiPath, targetClass).receiveSingle();
+  }
+
+  protected <O> O post(StreamPipesApiPath apiPath, T object, Class<O> responseClass) {
+    return new PostRequestWithPayloadResponse<>(clientConfig, apiPath, object, responseClass).postItem();
+  }
+
+  protected void post(StreamPipesApiPath apiPath, T object) {
+    new PostRequestWithoutPayloadResponse<>(clientConfig, apiPath, object).postItem();
+  }
+
+  protected abstract StreamPipesApiPath getBaseResourcePath();
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java
index 1dfb5f8..43b8472 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/CRUDApi.java
@@ -15,19 +15,19 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.api;
 
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import java.util.List;
 
-import javax.ws.rs.Path;
-import java.util.Map;
+public interface CRUDApi<ID, T> {
 
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
+  T get(ID id);
 
-  @Override
-  protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
-    return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
-  }
+  List<T> all();
+
+  void create(T element);
+
+  void delete(ID id);
+
+  void update(T element);
 }
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
new file mode 100644
index 0000000..85e5538
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+
+import java.util.List;
+
+public class DataSinkApi extends AbstractClientApi<DataSinkInvocation> implements CRUDApi<String, DataSinkInvocation> {
+
+  public DataSinkApi(StreamPipesClientConfig clientConfig) {
+    super(clientConfig, DataSinkInvocation.class);
+  }
+
+  @Override
+  public DataSinkInvocation get(String s) {
+    return null;
+  }
+
+  @Override
+  public List<DataSinkInvocation> all() {
+    return getAll(getBaseResourcePath());
+  }
+
+  @Override
+  public void create(DataSinkInvocation element) {
+
+  }
+
+  @Override
+  public void delete(String s) {
+
+  }
+
+  @Override
+  public void update(DataSinkInvocation element) {
+
+  }
+
+  public DataSinkInvocation getDataSinkForPipelineElement(String templateId, DataSinkInvocation pipelineElement) {
+    StreamPipesApiPath path = StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
+            .addToPath("pipeline-element-templates")
+            .addToPath(templateId)
+            .addToPath("sink");
+
+    return post(path, pipelineElement, DataSinkInvocation.class);
+  }
+
+  @Override
+  protected StreamPipesApiPath getBaseResourcePath() {
+    return StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
+            .addToPath("actions")
+            .addToPath("own");
+  }
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java
new file mode 100644
index 0000000..dbcec49
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.live.EventProcessor;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.dataformat.SpDataFormatFactory;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+public class DataStreamApi extends AbstractClientApi<SpDataStream> implements CRUDApi<String, SpDataStream> {
+
+  public DataStreamApi(StreamPipesClientConfig clientConfig) {
+    super(clientConfig, SpDataStream.class);
+  }
+
+  @Override
+  public SpDataStream get(String s) {
+    return null;
+  }
+
+  @Override
+  public List<SpDataStream> all() {
+    return getAll(getBaseResourcePath());
+  }
+
+  @Override
+  public void create(SpDataStream element) {
+
+  }
+
+  @Override
+  public void delete(String s) {
+
+  }
+
+  @Override
+  public void update(SpDataStream element) {
+
+  }
+
+  public void subscribe(SpDataStream stream, EventProcessor callback) {
+    Optional<SpDataFormatFactory> formatConverterOpt = this
+            .clientConfig
+            .getRegisteredDataFormats()
+            .stream()
+            .filter(format -> stream
+                    .getEventGrounding()
+                    .getTransportFormats()
+                    .get(0)
+                    .getRdfType()
+                    .stream()
+                    .anyMatch(tf -> tf.toString().equals(format.getTransportFormatRdfUri())))
+            .findFirst();
+
+    if (formatConverterOpt.isPresent()) {
+      final SpDataFormatDefinition converter = formatConverterOpt.get().createInstance();
+      SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(getKafkaProtocol(stream), getOutputTopic(stream), new InternalEventProcessor<byte[]>() {
+        @Override
+        public void onEvent(byte[] event) {
+          try {
+            Event spEvent = EventFactory.fromMap(converter.toMap(event));
+            callback.onEvent(spEvent);
+          } catch (SpRuntimeException e) {
+            e.printStackTrace();
+          }
+        }
+      });
+      Thread t = new Thread(kafkaConsumer);
+      t.start();
+
+    } else {
+      throw new SpRuntimeException("No converter found for data format - did you add a format factory (client.registerDataFormat)?");
+    }
+  }
+
+  private KafkaTransportProtocol getKafkaProtocol(SpDataStream stream) {
+    return (KafkaTransportProtocol) stream.getEventGrounding().getTransportProtocol();
+  }
+
+  private String getOutputTopic(SpDataStream spDataStream) {
+    return spDataStream
+            .getEventGrounding()
+            .getTransportProtocol()
+            .getTopicDefinition()
+            .getActualTopicName();
+  }
+
+  @Override
+  protected StreamPipesApiPath getBaseResourcePath() {
+    return StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
+            .addToPath("streams")
+            .addToPath("own");
+  }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
similarity index 50%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
index 1dfb5f8..fe80ae3 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
@@ -15,19 +15,33 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.api;
 
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.model.message.Message;
+import org.apache.streampipes.model.pipeline.Pipeline;
 
-import javax.ws.rs.Path;
-import java.util.Map;
+import java.util.List;
 
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
+public class PipelineApi extends AbstractClientApi<Pipeline> {
+
+  public PipelineApi(StreamPipesClientConfig clientConfig) {
+    super(clientConfig, Pipeline.class);
+  }
+
+  public List<Pipeline> all() {
+      return getAll(getBaseResourcePath());
+  }
+
+  public Message start(String pipelineId) {
+    return null;
+  }
 
   @Override
-  protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
-    return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
+  protected StreamPipesApiPath getBaseResourcePath() {
+    return StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
+            .addToPath("pipelines")
+            .addToPath("own");
   }
 }
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java
new file mode 100644
index 0000000..9191b3e
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.model.template.PipelineElementTemplate;
+
+import java.util.List;
+
+public class PipelineElementTemplateApi extends AbstractClientApi<PipelineElementTemplate>
+        implements CRUDApi<String, PipelineElementTemplate> {
+
+  public PipelineElementTemplateApi(StreamPipesClientConfig clientConfig) {
+    super(clientConfig, PipelineElementTemplate.class);
+  }
+
+  @Override
+  public PipelineElementTemplate get(String id) {
+    return getSingle(getBaseResourcePath().addToPath(id));
+  }
+
+  @Override
+  public List<PipelineElementTemplate> all() {
+    return getAll(getBaseResourcePath());
+  }
+
+  @Override
+  public void create(PipelineElementTemplate element) {
+    post(getBaseResourcePath(), element);
+  }
+
+  @Override
+  public void delete(String s) {
+
+  }
+
+  @Override
+  public void update(PipelineElementTemplate element) {
+
+  }
+
+  @Override
+  protected StreamPipesApiPath getBaseResourcePath() {
+    return StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
+            .addToPath("pipeline-element-templates");
+  }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataSinkApi.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataSinkApi.java
index 1dfb5f8..7eb3d9e 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataSinkApi.java
@@ -15,19 +15,9 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.api;
 
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+public interface SupportsDataSinkApi {
 
-import javax.ws.rs.Path;
-import java.util.Map;
-
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
-
-  @Override
-  protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
-    return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
-  }
+  DataSinkApi sinks();
 }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataStreamApi.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataStreamApi.java
index 1dfb5f8..e4ff907 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsDataStreamApi.java
@@ -15,19 +15,9 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.api;
 
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+public interface SupportsDataStreamApi {
 
-import javax.ws.rs.Path;
-import java.util.Map;
-
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
-
-  @Override
-  protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
-    return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
-  }
+  DataStreamApi streams();
 }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineApi.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineApi.java
index 1dfb5f8..7daf809 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineApi.java
@@ -15,19 +15,9 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.api;
 
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+public interface SupportsPipelineApi {
 
-import javax.ws.rs.Path;
-import java.util.Map;
-
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
-
-  @Override
-  protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
-    return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
-  }
+  PipelineApi pipelines();
 }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineElementTemplateApi.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineElementTemplateApi.java
index 1dfb5f8..a9338a6 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/SupportsPipelineElementTemplateApi.java
@@ -15,19 +15,9 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.api;
 
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+public interface SupportsPipelineElementTemplateApi {
 
-import javax.ws.rs.Path;
-import java.util.Map;
-
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
-
-  @Override
-  protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
-    return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
-  }
+  PipelineElementTemplateApi pipelineElementTemplates();
 }
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/GetRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/GetRequest.java
new file mode 100644
index 0000000..47f936d
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/GetRequest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
+import java.io.IOException;
+import java.util.List;
+
+public class GetRequest<T> extends HttpRequest {
+
+  private Class<T> targetClass;
+
+  public GetRequest(StreamPipesClientConfig clientConfig,
+                    StreamPipesApiPath apiPath,
+                    Class<T> targetClass) {
+    super(clientConfig, apiPath);
+    this.targetClass = targetClass;
+  }
+
+  public T receiveSingle() throws SpRuntimeException {
+    return deserialize(executeRequest(), targetClass);
+  }
+
+  public List<T> receiveList() throws SpRuntimeException {
+    return deserializeList(executeRequest(), targetClass);
+  }
+
+  private String executeRequest() throws SpRuntimeException {
+    try {
+      return Request
+              .Get(makeUrl())
+              .setHeaders(standardHeaders())
+              .execute()
+              .returnContent()
+              .asString();
+    } catch (IOException e) {
+      throw new SpRuntimeException(e.getMessage());
+    }
+  }
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
new file mode 100644
index 0000000..f4ed4f7
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import org.apache.http.Header;
+import org.apache.streampipes.client.http.header.Headers;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class HttpRequest {
+
+  private StreamPipesClientConfig clientConfig;
+  private StreamPipesApiPath apiPath;
+  private ObjectMapper objectMapper;
+
+  public HttpRequest(StreamPipesClientConfig clientConfig, StreamPipesApiPath apiPath) {
+    this.clientConfig = clientConfig;
+    this.objectMapper = clientConfig.getSerializer();
+    this.apiPath = apiPath;
+  }
+
+  protected Header[] standardHeaders() {
+    List<Header> headers = new ArrayList<>();
+    headers.add(Headers.auth(clientConfig.getCredentials()));
+    headers.add(Headers.acceptJson());
+    return headers.toArray(new Header[0]);
+  }
+
+  protected Header[] standardPostHeaders() {
+    List<Header> headers = new ArrayList<>(Arrays.asList(standardHeaders()));
+    headers.add(Headers.contentTypeJson());
+    return headers.toArray(new Header[0]);
+  }
+
+  protected <T> String serialize(T element) {
+    try {
+      return objectMapper.writeValueAsString(element);
+    } catch (JsonProcessingException e) {
+      throw new SpRuntimeException(e.getCause());
+    }
+  }
+
+  protected <T> T deserialize(String json, Class<T> targetClass) throws SpRuntimeException {
+    try {
+      return objectMapper.readValue(json, targetClass);
+    } catch (JsonProcessingException e) {
+      throw new SpRuntimeException(e.getCause());
+    }
+  }
+
+  protected <T> List<T> deserializeList(String json, Class<T> targetClass) throws SpRuntimeException {
+    CollectionType listType = objectMapper.getTypeFactory()
+            .constructCollectionType(List.class, targetClass);
+    try {
+      return objectMapper.readValue(json, listType);
+    } catch (JsonProcessingException e) {
+      throw new SpRuntimeException(e.getCause());
+    }
+  }
+
+  protected String makeUrl() {
+    return makeProtocol() + clientConfig.getStreamPipesHost()
+            + ":"
+            + clientConfig.getStreamPipesPort()
+            + "/"
+            + apiPath.toString();
+  }
+
+  private String makeProtocol() {
+    return clientConfig.isHttpsDisabled() ? "http://" : "https://";
+  }
+
+
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequest.java
new file mode 100644
index 0000000..3f9d489
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
+import java.io.IOException;
+
+public abstract class PostRequest<T, O> extends HttpRequest {
+
+  protected T element;
+
+  public PostRequest(StreamPipesClientConfig clientConfig,
+                     StreamPipesApiPath apiPath,
+                     T element) {
+    super(clientConfig, apiPath);
+    this.element = element;
+  }
+
+  public abstract O postItem();
+
+  protected Response executeRequest() throws SpRuntimeException {
+    try {
+      return Request
+              .Post(makeUrl())
+              .setHeaders(standardPostHeaders())
+              .bodyString(serialize(element), ContentType.APPLICATION_JSON)
+              .execute();
+    } catch (IOException e) {
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+
+
+
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequestWithPayloadResponse.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequestWithPayloadResponse.java
new file mode 100644
index 0000000..3b68102
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequestWithPayloadResponse.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http;
+
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+
+import java.io.IOException;
+
+public class PostRequestWithPayloadResponse<T, O> extends PostRequest<T, O> {
+
+  private Class<O> responseClass;
+
+  public PostRequestWithPayloadResponse(StreamPipesClientConfig clientConfig,
+                                        StreamPipesApiPath apiPath,
+                                        T element,
+                                        Class<O> responseClass) {
+    super(clientConfig, apiPath, element);
+    this.responseClass = responseClass;
+  }
+
+  public O postItem() throws SpRuntimeException {
+    return deserialize(executeAndReturnContent(), responseClass);
+  }
+
+  private String executeAndReturnContent() {
+    try {
+      return executeRequest().returnContent().asString();
+    } catch (IOException e) {
+      throw new SpRuntimeException(e.getMessage());
+    }
+  }
+
+
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequestWithoutPayloadResponse.java
similarity index 58%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequestWithoutPayloadResponse.java
index 1dfb5f8..51f54a7 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PostRequestWithoutPayloadResponse.java
@@ -15,19 +15,22 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.http;
 
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
 
-import javax.ws.rs.Path;
-import java.util.Map;
+public class PostRequestWithoutPayloadResponse<T> extends PostRequest<T, Void> {
 
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
+  public PostRequestWithoutPayloadResponse(StreamPipesClientConfig clientConfig,
+                                           StreamPipesApiPath apiPath,
+                                           T element) {
+    super(clientConfig, apiPath, element);
+  }
 
   @Override
-  protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
-    return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
+  public Void postItem() {
+    executeRequest();
+    return null;
   }
 }
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/header/Headers.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/header/Headers.java
new file mode 100644
index 0000000..58709df
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/header/Headers.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.http.header;
+
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.apache.streampipes.client.StreamPipesCredentials;
+
+public class Headers {
+
+  private static final String AUTHORIZATION = "Authorization";
+  private static final String ACCEPT = "Accept";
+  private static final String CONTENT_TYPE = "Content-type";
+  private static final String BEARER = "Bearer ";
+  private static final String APPLICATION_JSON_TYPE = "application/json";
+
+  public static Header auth(StreamPipesCredentials credentials) {
+    return makeHeader(AUTHORIZATION, BEARER + credentials.getApiKey());
+  }
+
+  public static Header acceptJson() {
+    return makeHeader(ACCEPT, APPLICATION_JSON_TYPE);
+  }
+
+  private static Header makeHeader(String name, String value) {
+    return new BasicHeader(name, value);
+  }
+
+  public static Header contentTypeJson() {
+    return makeHeader(CONTENT_TYPE, APPLICATION_JSON_TYPE);
+  }
+}
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-client/src/main/java/org/apache/streampipes/client/live/EventProcessor.java
similarity index 60%
copy from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/live/EventProcessor.java
index 1dfb5f8..2af4cb9 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/live/EventProcessor.java
@@ -15,19 +15,11 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.container.api;
+package org.apache.streampipes.client.live;
 
-import org.apache.streampipes.container.declarer.PipelineTemplateDeclarer;
-import org.apache.streampipes.container.init.DeclarersSingleton;
+import org.apache.streampipes.model.runtime.Event;
 
-import javax.ws.rs.Path;
-import java.util.Map;
+public interface EventProcessor {
 
-@Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
-
-  @Override
-  protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
-    return DeclarersSingleton.getInstance().getPipelineTemplateDeclarers();
-  }
+  void onEvent(Event event);
 }
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java b/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java
new file mode 100644
index 0000000..b587f0d
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.model;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.dataformat.SpDataFormatFactory;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamPipesClientConfig {
+
+  private StreamPipesCredentials credentials;
+  private String streamPipesHost;
+  private Integer streamPipesPort;
+  private ObjectMapper serializer;
+  private boolean httpsDisabled;
+  private List<SpDataFormatFactory> registeredDataFormats;
+
+  public StreamPipesClientConfig(StreamPipesCredentials credentials,
+                                 String streamPipesHost,
+                                 Integer streamPipesPort,
+                                 boolean httpsDisabled) {
+    this.credentials = credentials;
+    this.streamPipesHost = streamPipesHost;
+    this.streamPipesPort = streamPipesPort;
+    this.httpsDisabled = httpsDisabled;
+    this.serializer = JacksonSerializer.getObjectMapper();
+    this.registeredDataFormats = new ArrayList<>();
+  }
+
+  public StreamPipesCredentials getCredentials() {
+    return credentials;
+  }
+
+  public String getStreamPipesHost() {
+    return streamPipesHost;
+  }
+
+  public Integer getStreamPipesPort() {
+    return streamPipesPort;
+  }
+
+  public ObjectMapper getSerializer() {
+    return serializer;
+  }
+
+  public boolean isHttpsDisabled() {
+    return httpsDisabled;
+  }
+
+  public void addDataFormat(SpDataFormatFactory spDataFormatFactory) {
+    this.registeredDataFormats.add(spDataFormatFactory);
+  }
+
+  public List<SpDataFormatFactory> getRegisteredDataFormats() {
+    return registeredDataFormats;
+  }
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/util/StreamPipesApiPath.java b/streampipes-client/src/main/java/org/apache/streampipes/client/util/StreamPipesApiPath.java
new file mode 100644
index 0000000..c3764af
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/util/StreamPipesApiPath.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.util;
+
+import org.apache.streampipes.client.StreamPipesCredentials;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.StringJoiner;
+
+public class StreamPipesApiPath {
+
+  private List<String> pathItems;
+  private static final List<String> BaseApiPathV2 = Arrays.asList("streampipes-backend", "api", "v2");
+
+  public static StreamPipesApiPath fromBaseApiPath() {
+    return new StreamPipesApiPath(BaseApiPathV2);
+  }
+
+  public static StreamPipesApiPath fromUserApiPath(StreamPipesCredentials credentials) {
+    List<String> authPath = new ArrayList<>();
+    authPath.addAll(BaseApiPathV2);
+    authPath.addAll(Arrays.asList("users", credentials.getUsername()));
+    return new StreamPipesApiPath(authPath);
+  }
+
+
+  private StreamPipesApiPath(List<String> initialPathItems) {
+    this.pathItems = initialPathItems;
+  }
+
+  public StreamPipesApiPath addToPath(String pathItem) {
+    this.pathItems.add(pathItem);
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    StringJoiner joiner = new StringJoiner("/");
+    pathItems.forEach(joiner::add);
+    return joiner.toString();
+  }
+}
diff --git a/streampipes-client/src/test/java/org/apache/streampipes/client/TestStreamPipesClient.java b/streampipes-client/src/test/java/org/apache/streampipes/client/TestStreamPipesClient.java
new file mode 100644
index 0000000..74e622e
--- /dev/null
+++ b/streampipes-client/src/test/java/org/apache/streampipes/client/TestStreamPipesClient.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.template.PipelineElementTemplate;
+import org.apache.streampipes.model.template.PipelineElementTemplateConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestStreamPipesClient {
+
+  public static void main(String[] args) {
+    StreamPipesCredentials credentials = StreamPipesCredentials
+            .from(System.getenv("user"), System.getenv("apiKey"));
+
+    StreamPipesClient client = StreamPipesClient
+            .create("localhost", 8082, credentials, true);
+
+    List<Pipeline> pipelines = client.pipelines().all();
+
+    pipelines.forEach(pipeline -> System.out.println(pipeline.getName()));
+
+    PipelineElementTemplate template = new PipelineElementTemplate();
+    template.setTemplateName("Test");
+    template.setTemplateDescription("description");
+    template.setBasePipelineElementAppId("org.apache.streampipes.sinks.internal.jvm.dashboard");
+    Map<String, PipelineElementTemplateConfig> configs = new HashMap<>();
+    PipelineElementTemplateConfig config = new PipelineElementTemplateConfig();
+    config.setValue("test");
+    configs.put("visualization-name", config);
+    template.setTemplateConfigs(configs);
+
+    //client.pipelineElementTemplates().create(template);
+
+    List<PipelineElementTemplate> templates = client.pipelineElementTemplates().all();
+
+    templates.forEach(t -> System.out.println(t.getTemplateName()));
+
+    List<DataSinkInvocation> dataSinks = client.sinks().all();
+
+//    System.out.println(dataSinks.size());
+//    System.out.println(template.getCouchDbId());
+//    DataSinkInvocation invocation = client.sinks().getDataSinkForPipelineElement(templates.get(0).getCouchDbId(), dataSinks.get(0));
+//    System.out.println(invocation.getName());
+
+    List<SpDataStream> dataStreams = client.streams().all();
+    System.out.println(dataStreams.size());
+
+    //SpDataStream myStream = dataStreams.stream().filter(stream -> stream.getCorrespondingAdapterId().equals("org.apache.streampipes.connect.adapters.simulator.randomdatastream")).findFirst().get();
+
+    client.registerDataFormat(new JsonDataFormatFactory());
+
+    client.streams().subscribe(dataStreams.get(1), event -> MapUtils.debugPrint(System.out, "event", event.getRaw()));
+  }
+}
diff --git a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
index 7a267a7..cd27658 100644
--- a/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
+++ b/streampipes-container-extensions/src/main/java/org/apache/streampipes/container/extensions/ExtensionsResourceConfig.java
@@ -31,7 +31,7 @@ public class ExtensionsResourceConfig extends ResourceConfig {
         register(DataProcessorPipelineElementResource.class);
         register(DataStreamPipelineElementResource.class);
         register(WelcomePage.class);
-        register(PipelineElementTemplateResource.class);
+        register(PipelineTemplateResource.class);
 
         //register(WelcomePageWorker.class);
         register(GuessResource.class);
diff --git a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java
index 920c11d..1ef0379 100644
--- a/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java
+++ b/streampipes-container-standalone/src/main/java/org/apache/streampipes/container/standalone/init/PipelineElementContainerResourceConfig.java
@@ -30,7 +30,7 @@ public class PipelineElementContainerResourceConfig extends ResourceConfig {
     register(DataProcessorPipelineElementResource.class);
     register(DataStreamPipelineElementResource.class);
     register(WelcomePage.class);
-    register(PipelineElementTemplateResource.class);
+    register(PipelineTemplateResource.class);
 
     register(JacksonSerializationProvider.class);
   }
diff --git a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java b/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineTemplateResource.java
similarity index 91%
rename from streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
rename to streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineTemplateResource.java
index 1dfb5f8..0a930ae 100644
--- a/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineElementTemplateResource.java
+++ b/streampipes-container/src/main/java/org/apache/streampipes/container/api/PipelineTemplateResource.java
@@ -24,7 +24,7 @@ import javax.ws.rs.Path;
 import java.util.Map;
 
 @Path("/template")
-public class PipelineElementTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
+public class PipelineTemplateResource extends AbstractPipelineElementResource<PipelineTemplateDeclarer> {
 
   @Override
   protected Map<String, PipelineTemplateDeclarer> getElementDeclarers() {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineElementTemplate.java b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineElementTemplate.java
index 8a5507f..96c1078 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineElementTemplate.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/template/PipelineElementTemplate.java
@@ -22,6 +22,7 @@ import com.google.gson.annotations.SerializedName;
 import org.apache.streampipes.model.shared.annotation.TsModel;
 
 import java.util.Map;
+import java.util.UUID;
 
 @TsModel
 public class PipelineElementTemplate {
@@ -47,6 +48,7 @@ public class PipelineElementTemplate {
   }
 
   public PipelineElementTemplate() {
+    this.couchDbId = UUID.randomUUID().toString();
   }
 
   public String getTemplateName() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
index d522345..a07e50c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.manager.setup;
 
-import org.apache.streampipes.container.util.Util;
 import org.apache.streampipes.model.client.endpoint.RdfEndpoint;
 import org.apache.streampipes.model.message.Message;
 import org.apache.streampipes.model.message.Notifications;
@@ -154,8 +153,12 @@ public class CouchDbInstallationStep implements InstallationStep {
             MapReduce usernameFunction = new MapReduce();
             usernameFunction.setMap("function(doc) { if(doc.email) { emit(doc.email, doc); } }");
 
+            MapReduce tokenFunction = new MapReduce();
+            tokenFunction.setMap("function(doc) { if (doc.userApiTokens) { doc.userApiTokens.forEach(function(token) { emit(token.hashedToken, doc.email); });}}");
+
             views.put("password", passwordFunction);
             views.put("username", usernameFunction);
+            views.put("token", tokenFunction);
 
             userDocument.setViews(views);
             Response resp = Utils.getCouchDbUserClient().design().synchronizeWithDb(userDocument);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
index 4cea7b4..fc55d28 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
@@ -114,7 +114,7 @@ public class PipelineElementTemplateVisitor implements StaticPropertyVisitor {
   }
 
   private Object getValue(String internalName) {
-    return configs.get(internalName);
+    return configs.get(internalName).getValue();
   }
 
   private boolean hasKey(String internalName) {
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/PipelineElementTemplateResource.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/PipelineElementTemplateResource.java
index a922f37..4b86018 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/PipelineElementTemplateResource.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/PipelineElementTemplateResource.java
@@ -24,7 +24,6 @@ import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.template.PipelineElementTemplate;
 import org.apache.streampipes.rest.impl.AbstractRestInterface;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
-import retrofit2.http.Query;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
@@ -37,7 +36,7 @@ public class PipelineElementTemplateResource extends AbstractRestInterface {
   @Produces(MediaType.APPLICATION_JSON)
   @JacksonSerialized
   public Response getAll(@QueryParam("appId") String appId) {
-    if (appId != null) {
+    if (appId == null) {
       return ok(getPipelineElementTemplateStorage().getAll());
     } else {
       return ok(getPipelineElementTemplateStorage().getPipelineElementTemplatesforAppId(appId));
@@ -56,7 +55,7 @@ public class PipelineElementTemplateResource extends AbstractRestInterface {
     }
   }
 
-  @GET
+  @POST
   @Consumes(MediaType.APPLICATION_JSON)
   @JacksonSerialized
   public Response create(PipelineElementTemplate entity) {
@@ -95,23 +94,23 @@ public class PipelineElementTemplateResource extends AbstractRestInterface {
   @Consumes(MediaType.APPLICATION_JSON)
   @JacksonSerialized
   public Response getPipelineElementForTemplate(@PathParam("id") String id,
-                                                @QueryParam("overwriteNames") boolean overwriteNameAndDescription,
+                                                @QueryParam("overwriteNames") String overwriteNameAndDescription,
                                                 DataSinkInvocation invocation) {
     PipelineElementTemplate template = getPipelineElementTemplateStorage().getElementById(id);
-    return ok(new DataSinkTemplateHandler(template, invocation, overwriteNameAndDescription)
+    return ok(new DataSinkTemplateHandler(template, invocation, Boolean.parseBoolean(overwriteNameAndDescription))
             .applyTemplateOnPipelineElement());
   }
 
   @POST
-  @Path("{id}/sink")
+  @Path("{id}/processor")
   @Produces(MediaType.APPLICATION_JSON)
   @Consumes(MediaType.APPLICATION_JSON)
   @JacksonSerialized
   public Response getPipelineElementForTemplate(@PathParam("id") String id,
-                                                @Query("overwriteNames") boolean overwriteNameAndDescription,
+                                                @QueryParam("overwriteNames") String overwriteNameAndDescription,
                                                 DataProcessorInvocation invocation) {
     PipelineElementTemplate template = getPipelineElementTemplateStorage().getElementById(id);
-    return ok(new DataProcessorTemplateHandler(template, invocation, overwriteNameAndDescription)
+    return ok(new DataProcessorTemplateHandler(template, invocation, Boolean.parseBoolean(overwriteNameAndDescription))
             .applyTemplateOnPipelineElement());
   }