You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/10/02 08:56:25 UTC
[incubator-streampipes] 01/02: [STREAMPIPES-436] Inject StreamPipes
client into pipeline element services, improve client configuration
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-426
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 4aec211fd226804827a4b0b8f383e049681372bb
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sat Oct 2 10:54:00 2021 +0200
[STREAMPIPES-436] Inject StreamPipes client into pipeline element services, improve client configuration
---
.../backend/UnauthenticatedInterfaces.java | 3 +
.../streampipes/client/StreamPipesClient.java | 32 +++++++--
.../streampipes/client/api/AbstractClientApi.java | 43 +++---------
...tClientApi.java => AbstractTypedClientApi.java} | 31 ++-------
.../streampipes/client/api/CustomRequestApi.java | 27 +++-----
.../streampipes/client/api/DataProcessorApi.java | 2 +-
.../apache/streampipes/client/api/DataSinkApi.java | 2 +-
.../streampipes/client/api/DataStreamApi.java | 2 +-
.../apache/streampipes/client/api/PipelineApi.java | 2 +-
.../client/api/PipelineElementTemplateApi.java | 2 +-
.../streampipes/client/http/HttpRequest.java | 22 +++---
.../model/ClientConnectionConfigResolver.java | 33 ++++-----
.../client/model/ClientConnectionUrlResolver.java | 23 ++-----
.../client/model/StreamPipesClientConfig.java | 36 ++--------
...java => StreamPipesClientConnectionConfig.java} | 42 ++++--------
.../client/util/StreamPipesApiPath.java | 14 ++--
.../apache/streampipes/commons/constants/Envs.java | 4 +-
.../impl/datalake/DataLakeNoUserResourceV3.java | 2 +-
.../wrapper/declarer/PipelineElementDeclarer.java | 3 +-
.../wrapper/utils/StreamPipesClientResolver.java | 21 ++----
...StreamPipesClientRuntimeConnectionResolver.java | 79 ++++++++++++++++++++++
21 files changed, 211 insertions(+), 214 deletions(-)
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java
index 68dba7a..3971e30 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java
@@ -26,6 +26,9 @@ public class UnauthenticatedInterfaces {
return Arrays.asList(
"/api/v2/setup/configured",
"/api/v2/auth/login",
+ "/api/v2/pe/*/assets/icon",
+ "/api/v2/connect/master/description/*/assets/icon",
+ "/api/v2/connect/*/master/administration/**",
"/api/auth/**",
"/oauth2/**",
"/api/all",
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
index 50aac28..11a68a6 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
@@ -19,7 +19,9 @@ package org.apache.streampipes.client;
import org.apache.streampipes.client.api.*;
import org.apache.streampipes.client.credentials.CredentialsProvider;
+import org.apache.streampipes.client.model.ClientConnectionUrlResolver;
import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.model.StreamPipesClientConnectionConfig;
import org.apache.streampipes.dataformat.SpDataFormatFactory;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
@@ -36,6 +38,14 @@ public class StreamPipesClient implements SupportsPipelineApi,
private StreamPipesClientConfig config;
/**
+ * Create a new StreamPipes API client with a runtime connection resolver
+ * @param connectionConfig A ClientConnectionConfigResolver providing connection details
+ */
+ public static StreamPipesClient create(ClientConnectionUrlResolver connectionConfig) {
+ return new StreamPipesClient(connectionConfig);
+ }
+
+ /**
* Create a new StreamPipes API client with default port and custom HTTPS settings
* @param streamPipesHost The hostname of the StreamPipes instance without scheme
* @param credentials The credentials object
@@ -83,14 +93,18 @@ public class StreamPipesClient implements SupportsPipelineApi,
return new StreamPipesClient(streamPipesHost, streamPipesPort, credentials, httpsDisabled);
}
+ private StreamPipesClient(ClientConnectionUrlResolver connectionConfig) {
+ this.config = new StreamPipesClientConfig(connectionConfig);
+ this.registerDataFormat(new JsonDataFormatFactory());
+ this.registerDataFormat(new FstDataFormatFactory());
+ this.registerDataFormat(new CborDataFormatFactory());
+ }
+
private StreamPipesClient(String streamPipesHost,
Integer streamPipesPort,
CredentialsProvider credentials,
boolean httpsDisabled) {
- this.config = new StreamPipesClientConfig(credentials, streamPipesHost, streamPipesPort, httpsDisabled);
- this.registerDataFormat(new JsonDataFormatFactory());
- this.registerDataFormat(new FstDataFormatFactory());
- this.registerDataFormat(new CborDataFormatFactory());
+ this(new StreamPipesClientConnectionConfig(credentials, streamPipesHost, streamPipesPort, httpsDisabled));
}
/**
@@ -102,13 +116,17 @@ public class StreamPipesClient implements SupportsPipelineApi,
}
public CredentialsProvider getCredentials() {
- return config.getCredentials();
+ return config.getConnectionConfig().getCredentials();
}
public StreamPipesClientConfig getConfig() {
return config;
}
+ public ClientConnectionUrlResolver getConnectionConfig() {
+ return config.getConnectionConfig();
+ }
+
/**
* Get API to work with pipelines
* @return {@link org.apache.streampipes.client.api.PipelineApi}
@@ -153,4 +171,8 @@ public class StreamPipesClient implements SupportsPipelineApi,
public DataProcessorApi processors() {
return new DataProcessorApi(config);
}
+
+ public CustomRequestApi customRequest() {
+ return new CustomRequestApi(config);
+ }
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
index 563bbca..b2ccc12 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
@@ -17,45 +17,20 @@
*/
package org.apache.streampipes.client.api;
-import org.apache.streampipes.client.http.*;
+import org.apache.streampipes.client.http.DeleteRequest;
+import org.apache.streampipes.client.http.PostRequestWithPayloadResponse;
+import org.apache.streampipes.client.http.PostRequestWithoutPayloadResponse;
import org.apache.streampipes.client.model.StreamPipesClientConfig;
-import org.apache.streampipes.client.serializer.ListSerializer;
import org.apache.streampipes.client.serializer.ObjectSerializer;
import org.apache.streampipes.client.serializer.Serializer;
import org.apache.streampipes.client.util.StreamPipesApiPath;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import java.util.List;
-
-public abstract class AbstractClientApi<T> {
+public class AbstractClientApi {
protected StreamPipesClientConfig clientConfig;
- private Class<T> targetClass;
- public AbstractClientApi(StreamPipesClientConfig clientConfig, Class<T> targetClass) {
+ public AbstractClientApi(StreamPipesClientConfig clientConfig) {
this.clientConfig = clientConfig;
- this.targetClass = targetClass;
- }
-
- protected List<T> getAll(StreamPipesApiPath apiPath) throws SpRuntimeException {
- ListSerializer<Void, T> serializer = new ListSerializer<>();
- return new GetRequest<>(clientConfig, apiPath, targetClass, serializer).executeRequest();
- }
-
- protected T getSingle(StreamPipesApiPath apiPath) throws SpRuntimeException {
- ObjectSerializer<Void, T> serializer = new ObjectSerializer<>();
- return new GetRequest<>(clientConfig, apiPath, targetClass, serializer).executeRequest();
- }
-
- protected <O> O getSingle(StreamPipesApiPath apiPath, Class<O> targetClass) throws SpRuntimeException {
- ObjectSerializer<Void, O> serializer = new ObjectSerializer<>();
- return new GetRequest<>(clientConfig, apiPath, targetClass, serializer).executeRequest();
- }
-
- protected <O> O post(StreamPipesApiPath apiPath, T object, Class<O> responseClass) {
- ObjectSerializer<T, O> serializer = new ObjectSerializer<>();
- return new PostRequestWithPayloadResponse<>(clientConfig, apiPath, serializer, object, responseClass)
- .executeRequest();
}
protected <O> O post(StreamPipesApiPath apiPath, Class<O> responseClass) {
@@ -63,7 +38,7 @@ public abstract class AbstractClientApi<T> {
return new PostRequestWithPayloadResponse<>(clientConfig, apiPath, serializer, responseClass).executeRequest();
}
- protected void post(StreamPipesApiPath apiPath, T object) {
+ protected <T> void post(StreamPipesApiPath apiPath, T object) {
ObjectSerializer<T, Void> serializer = new ObjectSerializer<>();
new PostRequestWithoutPayloadResponse<>(clientConfig, apiPath, serializer, object).executeRequest();
}
@@ -73,5 +48,9 @@ public abstract class AbstractClientApi<T> {
return new DeleteRequest<>(clientConfig, apiPath, responseClass, serializer).executeRequest();
}
- protected abstract StreamPipesApiPath getBaseResourcePath();
+ protected <T, O> O post(StreamPipesApiPath apiPath, T object, Class<O> responseClass) {
+ ObjectSerializer<T, O> serializer = new ObjectSerializer<>();
+ return new PostRequestWithPayloadResponse<>(clientConfig, apiPath, serializer, object, responseClass)
+ .executeRequest();
+ }
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractTypedClientApi.java
similarity index 60%
copy from streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractTypedClientApi.java
index 563bbca..eae2650 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractTypedClientApi.java
@@ -17,23 +17,21 @@
*/
package org.apache.streampipes.client.api;
-import org.apache.streampipes.client.http.*;
+import org.apache.streampipes.client.http.GetRequest;
import org.apache.streampipes.client.model.StreamPipesClientConfig;
import org.apache.streampipes.client.serializer.ListSerializer;
import org.apache.streampipes.client.serializer.ObjectSerializer;
-import org.apache.streampipes.client.serializer.Serializer;
import org.apache.streampipes.client.util.StreamPipesApiPath;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import java.util.List;
-public abstract class AbstractClientApi<T> {
+public abstract class AbstractTypedClientApi<T> extends AbstractClientApi {
- protected StreamPipesClientConfig clientConfig;
private Class<T> targetClass;
- public AbstractClientApi(StreamPipesClientConfig clientConfig, Class<T> targetClass) {
- this.clientConfig = clientConfig;
+ public AbstractTypedClientApi(StreamPipesClientConfig clientConfig, Class<T> targetClass) {
+ super(clientConfig);
this.targetClass = targetClass;
}
@@ -52,26 +50,5 @@ public abstract class AbstractClientApi<T> {
return new GetRequest<>(clientConfig, apiPath, targetClass, serializer).executeRequest();
}
- protected <O> O post(StreamPipesApiPath apiPath, T object, Class<O> responseClass) {
- ObjectSerializer<T, O> serializer = new ObjectSerializer<>();
- return new PostRequestWithPayloadResponse<>(clientConfig, apiPath, serializer, object, responseClass)
- .executeRequest();
- }
-
- protected <O> O post(StreamPipesApiPath apiPath, Class<O> responseClass) {
- ObjectSerializer<Void, O> serializer = new ObjectSerializer<>();
- return new PostRequestWithPayloadResponse<>(clientConfig, apiPath, serializer, responseClass).executeRequest();
- }
-
- protected void post(StreamPipesApiPath apiPath, T object) {
- ObjectSerializer<T, Void> serializer = new ObjectSerializer<>();
- new PostRequestWithoutPayloadResponse<>(clientConfig, apiPath, serializer, object).executeRequest();
- }
-
- protected <O> O delete(StreamPipesApiPath apiPath, Class<O> responseClass) {
- Serializer<Void, O, O> serializer = new ObjectSerializer<>();
- return new DeleteRequest<>(clientConfig, apiPath, responseClass, serializer).executeRequest();
- }
-
protected abstract StreamPipesApiPath getBaseResourcePath();
}
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/CustomRequestApi.java
similarity index 62%
copy from streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/api/CustomRequestApi.java
index 68dba7a..713e39f 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/CustomRequestApi.java
@@ -15,24 +15,19 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.backend;
+package org.apache.streampipes.client.api;
-import java.util.Arrays;
-import java.util.Collection;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
-public class UnauthenticatedInterfaces {
+public class CustomRequestApi extends AbstractClientApi {
- public static Collection<String> get() {
- return Arrays.asList(
- "/api/v2/setup/configured",
- "/api/v2/auth/login",
- "/api/auth/**",
- "/oauth2/**",
- "/api/all",
- "/error",
- "/",
- "/streampipes-backend/",
- "/streampipes-backend/index.html"
- );
+ public CustomRequestApi(StreamPipesClientConfig clientConfig) {
+ super(clientConfig);
}
+
+ public <T> void sendPost(String apiPath, T payload) {
+ post(StreamPipesApiPath.fromStreamPipesBasePath(apiPath), payload);
+ }
+
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java
index ed18e66..cd64795 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java
@@ -29,7 +29,7 @@ import org.apache.streampipes.model.graph.DataProcessorInvocation;
import java.util.List;
-public class DataProcessorApi extends AbstractClientApi<DataProcessorInvocation> implements CRUDApi<String, DataProcessorInvocation>{
+public class DataProcessorApi extends AbstractTypedClientApi<DataProcessorInvocation> implements CRUDApi<String, DataProcessorInvocation>{
public DataProcessorApi(StreamPipesClientConfig clientConfig) {
super(clientConfig, DataProcessorInvocation.class);
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
index d904266..586591b 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
@@ -28,7 +28,7 @@ import org.apache.streampipes.model.graph.DataSinkInvocation;
import java.util.List;
-public class DataSinkApi extends AbstractClientApi<DataSinkInvocation> implements CRUDApi<String, DataSinkInvocation> {
+public class DataSinkApi extends AbstractTypedClientApi<DataSinkInvocation> implements CRUDApi<String, DataSinkInvocation> {
public DataSinkApi(StreamPipesClientConfig clientConfig) {
super(clientConfig, DataSinkInvocation.class);
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java
index b5e2d35..3ed9d2c 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataStreamApi.java
@@ -29,7 +29,7 @@ import org.apache.streampipes.model.message.Message;
import java.net.URLEncoder;
import java.util.List;
-public class DataStreamApi extends AbstractClientApi<SpDataStream> implements CRUDApi<String, SpDataStream> {
+public class DataStreamApi extends AbstractTypedClientApi<SpDataStream> implements CRUDApi<String, SpDataStream> {
public DataStreamApi(StreamPipesClientConfig clientConfig) {
super(clientConfig, SpDataStream.class);
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
index 1f0d949..d0761d3 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineApi.java
@@ -25,7 +25,7 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import java.util.List;
-public class PipelineApi extends AbstractClientApi<Pipeline> implements CRUDApi<String, Pipeline> {
+public class PipelineApi extends AbstractTypedClientApi<Pipeline> implements CRUDApi<String, Pipeline> {
public PipelineApi(StreamPipesClientConfig clientConfig) {
super(clientConfig, Pipeline.class);
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java
index 914617c..ce64a45 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/PipelineElementTemplateApi.java
@@ -23,7 +23,7 @@ import org.apache.streampipes.model.template.PipelineElementTemplate;
import java.util.List;
-public class PipelineElementTemplateApi extends AbstractClientApi<PipelineElementTemplate>
+public class PipelineElementTemplateApi extends AbstractTypedClientApi<PipelineElementTemplate>
implements CRUDApi<String, PipelineElementTemplate> {
public PipelineElementTemplateApi(StreamPipesClientConfig clientConfig) {
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
index 4d80ef0..f199262 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/HttpRequest.java
@@ -22,6 +22,7 @@ import org.apache.http.*;
import org.apache.http.client.fluent.Request;
import org.apache.http.util.EntityUtils;
import org.apache.streampipes.client.http.header.Headers;
+import org.apache.streampipes.client.model.ClientConnectionUrlResolver;
import org.apache.streampipes.client.model.StreamPipesClientConfig;
import org.apache.streampipes.client.serializer.Serializer;
import org.apache.streampipes.client.util.StreamPipesApiPath;
@@ -31,11 +32,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.StringJoiner;
public abstract class HttpRequest<SO, DSO, DT> {
private final StreamPipesClientConfig clientConfig;
+ private final ClientConnectionUrlResolver connectionConfig;
private final StreamPipesApiPath apiPath;
private final ObjectMapper objectMapper;
private final Serializer<SO, DSO, DT> serializer;
@@ -44,13 +45,14 @@ public abstract class HttpRequest<SO, DSO, DT> {
StreamPipesApiPath apiPath,
Serializer<SO, DSO, DT> serializer) {
this.clientConfig = clientConfig;
+ this.connectionConfig = clientConfig.getConnectionConfig();
this.objectMapper = clientConfig.getSerializer();
this.apiPath = apiPath;
this.serializer = serializer;
}
protected Header[] standardHeaders() {
- List<Header> headers = new ArrayList<>(clientConfig.getCredentials().makeHeaders());
+ List<Header> headers = new ArrayList<>(connectionConfig.getCredentials().makeHeaders());
headers.add(Headers.acceptJson());
return headers.toArray(new Header[0]);
}
@@ -66,15 +68,12 @@ public abstract class HttpRequest<SO, DSO, DT> {
}
protected String makeUrl(boolean includePath) {
- StringJoiner joiner = new StringJoiner("");
- joiner.add(makeProtocol() + clientConfig.getStreamPipesHost()
- + ":"
- + clientConfig.getStreamPipesPort());
+ String baseUrl = clientConfig.getConnectionConfig().getBaseUrl();
if (includePath) {
- joiner.add("/" + apiPath.toString());
+ baseUrl = baseUrl + "/" + apiPath.toString();
}
- return joiner.toString();
+ return baseUrl;
}
public DT executeRequest() throws SpRuntimeException {
@@ -86,7 +85,7 @@ public abstract class HttpRequest<SO, DSO, DT> {
return afterRequest(serializer, response.getEntity());
} else {
if (status.getStatusCode() == 401) {
- throw new SpRuntimeException(" 401 - Access to this resource is forbidden - did you provide a poper API key?");
+ throw new SpRuntimeException(" 401 - Access to this resource is forbidden - did you provide a poper API key or client secret?");
} else {
throw new SpRuntimeException(status.getStatusCode() + " - " + status.getReasonPhrase());
}
@@ -106,9 +105,4 @@ public abstract class HttpRequest<SO, DSO, DT> {
protected abstract DT afterRequest(Serializer<SO, DSO, DT> serializer, HttpEntity entity) throws IOException;
- private String makeProtocol() {
- return clientConfig.isHttpsDisabled() ? "http://" : "https://";
- }
-
-
}
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java b/streampipes-client/src/main/java/org/apache/streampipes/client/model/ClientConnectionConfigResolver.java
similarity index 60%
copy from streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/model/ClientConnectionConfigResolver.java
index 68dba7a..e28bbb7 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/model/ClientConnectionConfigResolver.java
@@ -15,24 +15,25 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.backend;
+package org.apache.streampipes.client.model;
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.StringJoiner;
-public class UnauthenticatedInterfaces {
+public interface ClientConnectionConfigResolver extends ClientConnectionUrlResolver {
- public static Collection<String> get() {
- return Arrays.asList(
- "/api/v2/setup/configured",
- "/api/v2/auth/login",
- "/api/auth/**",
- "/oauth2/**",
- "/api/all",
- "/error",
- "/",
- "/streampipes-backend/",
- "/streampipes-backend/index.html"
- );
+ String getStreamPipesHost();
+
+ Integer getStreamPipesPort();
+
+ boolean isHttpsDisabled();
+
+ default String getBaseUrl() {
+ StringJoiner joiner = new StringJoiner("");
+ String protocol = isHttpsDisabled() ? "http://" : "https://";
+ joiner.add(protocol + getStreamPipesHost()
+ + ":"
+ + getStreamPipesPort());
+
+ return joiner.toString();
}
}
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java b/streampipes-client/src/main/java/org/apache/streampipes/client/model/ClientConnectionUrlResolver.java
similarity index 61%
copy from streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/model/ClientConnectionUrlResolver.java
index 68dba7a..ebfe053 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/model/ClientConnectionUrlResolver.java
@@ -15,24 +15,13 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.backend;
+package org.apache.streampipes.client.model;
-import java.util.Arrays;
-import java.util.Collection;
+import org.apache.streampipes.client.credentials.CredentialsProvider;
-public class UnauthenticatedInterfaces {
+public interface ClientConnectionUrlResolver {
- public static Collection<String> get() {
- return Arrays.asList(
- "/api/v2/setup/configured",
- "/api/v2/auth/login",
- "/api/auth/**",
- "/oauth2/**",
- "/api/all",
- "/error",
- "/",
- "/streampipes-backend/",
- "/streampipes-backend/index.html"
- );
- }
+ CredentialsProvider getCredentials();
+
+ String getBaseUrl();
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java b/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java
index 400d40a..ce117c0 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.client.model;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streampipes.client.credentials.CredentialsProvider;
import org.apache.streampipes.dataformat.SpDataFormatFactory;
import org.apache.streampipes.serializers.json.JacksonSerializer;
@@ -27,45 +26,20 @@ import java.util.List;
public class StreamPipesClientConfig {
- private CredentialsProvider credentials;
- private String streamPipesHost;
- private Integer streamPipesPort;
+ private ClientConnectionUrlResolver connectionConfig;
private ObjectMapper serializer;
- private boolean httpsDisabled;
private List<SpDataFormatFactory> registeredDataFormats;
- public StreamPipesClientConfig(CredentialsProvider credentials,
- String streamPipesHost,
- Integer streamPipesPort,
- boolean httpsDisabled) {
- this.credentials = credentials;
- this.streamPipesHost = streamPipesHost;
- this.streamPipesPort = streamPipesPort;
- this.httpsDisabled = httpsDisabled;
+ public StreamPipesClientConfig(ClientConnectionUrlResolver connectionConfig) {
+ this.connectionConfig = connectionConfig;
this.serializer = JacksonSerializer.getObjectMapper();
this.registeredDataFormats = new ArrayList<>();
}
- public CredentialsProvider getCredentials() {
- return credentials;
- }
-
- public String getStreamPipesHost() {
- return streamPipesHost;
- }
-
- public Integer getStreamPipesPort() {
- return streamPipesPort;
- }
-
public ObjectMapper getSerializer() {
return serializer;
}
- public boolean isHttpsDisabled() {
- return httpsDisabled;
- }
-
public void addDataFormat(SpDataFormatFactory spDataFormatFactory) {
this.registeredDataFormats.add(spDataFormatFactory);
}
@@ -73,4 +47,8 @@ public class StreamPipesClientConfig {
public List<SpDataFormatFactory> getRegisteredDataFormats() {
return registeredDataFormats;
}
+
+ public ClientConnectionUrlResolver getConnectionConfig() {
+ return connectionConfig;
+ }
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java b/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConnectionConfig.java
similarity index 55%
copy from streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java
copy to streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConnectionConfig.java
index 400d40a..bb37870 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConfig.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/model/StreamPipesClientConnectionConfig.java
@@ -17,60 +17,42 @@
*/
package org.apache.streampipes.client.model;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.streampipes.client.credentials.CredentialsProvider;
-import org.apache.streampipes.dataformat.SpDataFormatFactory;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-import java.util.ArrayList;
-import java.util.List;
+public class StreamPipesClientConnectionConfig implements ClientConnectionConfigResolver {
-public class StreamPipesClientConfig {
-
- private CredentialsProvider credentials;
+ private CredentialsProvider credentialsProvider;
private String streamPipesHost;
private Integer streamPipesPort;
- private ObjectMapper serializer;
private boolean httpsDisabled;
- private List<SpDataFormatFactory> registeredDataFormats;
- public StreamPipesClientConfig(CredentialsProvider credentials,
- String streamPipesHost,
- Integer streamPipesPort,
- boolean httpsDisabled) {
- this.credentials = credentials;
+ public StreamPipesClientConnectionConfig(CredentialsProvider credentialsProvider,
+ String streamPipesHost,
+ Integer streamPipesPort,
+ boolean httpsDisabled) {
+ this.credentialsProvider = credentialsProvider;
this.streamPipesHost = streamPipesHost;
this.streamPipesPort = streamPipesPort;
this.httpsDisabled = httpsDisabled;
- this.serializer = JacksonSerializer.getObjectMapper();
- this.registeredDataFormats = new ArrayList<>();
}
+ @Override
public CredentialsProvider getCredentials() {
- return credentials;
+ return credentialsProvider;
}
+ @Override
public String getStreamPipesHost() {
return streamPipesHost;
}
+ @Override
public Integer getStreamPipesPort() {
return streamPipesPort;
}
- public ObjectMapper getSerializer() {
- return serializer;
- }
-
+ @Override
public boolean isHttpsDisabled() {
return httpsDisabled;
}
-
- public void addDataFormat(SpDataFormatFactory spDataFormatFactory) {
- this.registeredDataFormats.add(spDataFormatFactory);
- }
-
- public List<SpDataFormatFactory> getRegisteredDataFormats() {
- return registeredDataFormats;
- }
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/util/StreamPipesApiPath.java b/streampipes-client/src/main/java/org/apache/streampipes/client/util/StreamPipesApiPath.java
index eff51fa..3b05345 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/util/StreamPipesApiPath.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/util/StreamPipesApiPath.java
@@ -17,21 +17,27 @@
*/
package org.apache.streampipes.client.util;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.StringJoiner;
+import java.util.*;
public class StreamPipesApiPath {
private List<String> pathItems;
private static final List<String> BaseApiPathV2 = Arrays.asList("streampipes-backend", "api", "v2");
+ public static StreamPipesApiPath fromStreamPipesBasePath() {
+ List<String> path = new ArrayList<>(Collections.singletonList("streampipes-backend"));
+ return new StreamPipesApiPath(path);
+ }
+
public static StreamPipesApiPath fromBaseApiPath() {
List<String> initialPaths = new ArrayList<>(BaseApiPathV2);
return new StreamPipesApiPath(initialPaths);
}
+ public static StreamPipesApiPath fromStreamPipesBasePath(String allSubPaths) {
+ return fromStreamPipesBasePath().addToPath(allSubPaths);
+ }
+
private StreamPipesApiPath(List<String> initialPathItems) {
this.pathItems = initialPathItems;
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 785282b..e342506 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -28,7 +28,9 @@ public enum Envs {
SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD"),
SP_INITIAL_CLIENT_USER("SP_INITIAL_SERVICE_USER"),
SP_INITIAL_CLIENT_SECRET("SP_INITIAL_CLIENT_SECRET"),
- SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS");
+ SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS"),
+ SP_CLIENT_USER("SP_CLIENT_USER"),
+ SP_CLIENT_SECRET("SP_CLIENT_SECRET");
private final String envVariableName;
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeNoUserResourceV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeNoUserResourceV3.java
index bc8220e..b05e430 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeNoUserResourceV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeNoUserResourceV3.java
@@ -27,7 +27,7 @@ import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-@Path("/v3/noauth/datalake")
+@Path("/v3/datalake/measure")
public class DataLakeNoUserResourceV3 extends AbstractRestResource {
private DataLakeNoUserManagementV3 dataLakeManagement;
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/PipelineElementDeclarer.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/PipelineElementDeclarer.java
index 73bb9ff..431783a 100644
--- a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/PipelineElementDeclarer.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/declarer/PipelineElementDeclarer.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.sdk.extractor.AbstractParameterExtractor;
import org.apache.streampipes.wrapper.params.binding.BindingParams;
import org.apache.streampipes.wrapper.runtime.PipelineElementRuntime;
+import org.apache.streampipes.wrapper.utils.StreamPipesClientResolver;
public abstract class PipelineElementDeclarer<B extends BindingParams, EPR extends
PipelineElementRuntime, I
@@ -39,7 +40,7 @@ public abstract class PipelineElementDeclarer<B extends BindingParams, EPR exten
elementId = graph.getElementId();
ConfigExtractor configExtractor = makeConfigExtractor(serviceId);
// TODO add StreamPipes Client support
- StreamPipesClient streamPipesClient = null;
+ StreamPipesClient streamPipesClient = new StreamPipesClientResolver().makeStreamPipesClientInstance();
epRuntime = getRuntime(graph, getExtractor(graph), configExtractor, streamPipesClient);
epRuntime.bindRuntime();
return new Response(graph.getElementId(), true);
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/utils/StreamPipesClientResolver.java
similarity index 62%
copy from streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java
copy to streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/utils/StreamPipesClientResolver.java
index 68dba7a..63657fd 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/UnauthenticatedInterfaces.java
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/utils/StreamPipesClientResolver.java
@@ -15,24 +15,13 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.backend;
+package org.apache.streampipes.wrapper.utils;
-import java.util.Arrays;
-import java.util.Collection;
+import org.apache.streampipes.client.StreamPipesClient;
-public class UnauthenticatedInterfaces {
+public class StreamPipesClientResolver {
- public static Collection<String> get() {
- return Arrays.asList(
- "/api/v2/setup/configured",
- "/api/v2/auth/login",
- "/api/auth/**",
- "/oauth2/**",
- "/api/all",
- "/error",
- "/",
- "/streampipes-backend/",
- "/streampipes-backend/index.html"
- );
+ public StreamPipesClient makeStreamPipesClientInstance() {
+ return StreamPipesClient.create(new StreamPipesClientRuntimeConnectionResolver());
}
}
diff --git a/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/utils/StreamPipesClientRuntimeConnectionResolver.java b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/utils/StreamPipesClientRuntimeConnectionResolver.java
new file mode 100644
index 0000000..149df66
--- /dev/null
+++ b/streampipes-wrapper/src/main/java/org/apache/streampipes/wrapper/utils/StreamPipesClientRuntimeConnectionResolver.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.utils;
+
+import org.apache.streampipes.client.credentials.CredentialsProvider;
+import org.apache.streampipes.client.credentials.StreamPipesTokenCredentials;
+import org.apache.streampipes.client.model.ClientConnectionUrlResolver;
+import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.commons.constants.InstallationConstants;
+import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceGroups;
+import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
+
+import java.util.Collections;
+import java.util.List;
+
+public class StreamPipesClientRuntimeConnectionResolver implements ClientConnectionUrlResolver {
+
+ public StreamPipesClientRuntimeConnectionResolver() {
+
+ }
+
+ @Override
+ public CredentialsProvider getCredentials() {
+ return new StreamPipesTokenCredentials(getClientApiUser(), getClientApiSecret());
+ }
+
+ @Override
+ public String getBaseUrl() {
+ String endpoint = findClientServices().get(0);
+ System.out.println(endpoint);
+ return endpoint;
+ }
+
+ private String findClientService() {
+ return findClientServices().get(0);
+ }
+
+ private String getClientApiUser() {
+ if (Envs.SP_CLIENT_USER.exists()) {
+ return Envs.SP_CLIENT_USER.getValue();
+ } else {
+ return InstallationConstants.INITIAL_CLIENT_USER_DEFAULT;
+ }
+ }
+
+ private String getClientApiSecret() {
+ if (Envs.SP_CLIENT_SECRET.exists()) {
+ return Envs.SP_CLIENT_SECRET.getValue();
+ } else {
+ return InstallationConstants.INITIAL_CLIENT_SECRET_DEFAULT;
+ }
+ }
+
+ private List<String> findClientServices() {
+ return SpServiceDiscovery
+ .getServiceDiscovery()
+ .getServiceEndpoints(
+ DefaultSpServiceGroups.CORE,
+ true,
+ Collections.singletonList(DefaultSpServiceTags.STREAMPIPES_CLIENT.asString())
+ );
+ }
+}