You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/03/18 21:43:59 UTC

[incubator-streampipes-extensions] branch dev updated (c0580c1 -> 0f610d6)

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git.


    from c0580c1  STREAMPIPES-56: Use explicit annotation in SecretStaticProperty
     new 90a3133  STREAMPIPES-64: REST pull adapter exception
     new 681ddb2  STREAMPIPES-101 Error in Datalake sink if upper case letters are included in runtime names
     new 0f610d6  Merge branch 'dev' of github.com:apache/incubator-streampipes-extensions into dev

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connect/protocol/stream/PullProtocol.java      |  6 +-
 .../sinks/databases/jvm/influxdb/InfluxDb.java     |  4 ++
 .../databases/jvm/influxdb/InfluxDbClient.java     | 16 ++---
 .../databases/jvm/influxdb/InfluxDbController.java |  1 +
 .../sinks/internal/jvm/datalake/DataLake.java      | 12 +++-
 .../internal/jvm/datalake/DataLakeController.java  |  1 +
 ...uxDbClient.java => DataLakeInfluxDbClient.java} | 71 +++++++++-------------
 7 files changed, 56 insertions(+), 55 deletions(-)
 rename streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/{InfluxDbClient.java => DataLakeInfluxDbClient.java} (77%)


[incubator-streampipes-extensions] 01/03: STREAMPIPES-64: REST pull adapter exception

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 90a3133561841b624ff3f2588d7bb48d0a00e517
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Tue Mar 17 22:50:32 2020 +0100

    STREAMPIPES-64: REST pull adapter exception
---
 .../apache/streampipes/connect/protocol/stream/PullProtocol.java    | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/protocol/stream/PullProtocol.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/protocol/stream/PullProtocol.java
index e955f96..6527e54 100644
--- a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/protocol/stream/PullProtocol.java
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/protocol/stream/PullProtocol.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.connect.adapter.model.generic.Protocol;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
 
 import java.io.InputStream;
+import java.net.ConnectException;
 import java.util.concurrent.*;
 
 public abstract class PullProtocol extends Protocol {
@@ -64,8 +65,8 @@ public abstract class PullProtocol extends Protocol {
 
             format.reset();
             SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
-            InputStream data = getDataFromEndpoint();
             try {
+                InputStream data = getDataFromEndpoint();
                 if(data != null) {
                     parser.parse(data, stk);
                 } else {
@@ -75,6 +76,7 @@ public abstract class PullProtocol extends Protocol {
                 logger.error("Error while parsing: " + e.getMessage());
             }
 
+
         };
 
         scheduler = Executors.newScheduledThreadPool(1);
@@ -93,5 +95,5 @@ public abstract class PullProtocol extends Protocol {
         scheduler.shutdownNow();
     }
 
-    abstract InputStream getDataFromEndpoint();
+    abstract InputStream getDataFromEndpoint() throws ParseException;
 }


[incubator-streampipes-extensions] 03/03: Merge branch 'dev' of github.com:apache/incubator-streampipes-extensions into dev

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 0f610d6aac62bde05ff89f9a1cbad76f2bfeb62a
Merge: 681ddb2 c0580c1
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Wed Mar 18 22:43:47 2020 +0100

    Merge branch 'dev' of github.com:apache/incubator-streampipes-extensions into dev

 .../sinks/internal/jvm/notification/NotificationProducer.java          | 3 +++
 1 file changed, 3 insertions(+)


[incubator-streampipes-extensions] 02/03: STREAMPIPES-101 Error in Datalake sink if upper case letters are included in runtime names

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 681ddb2e38db88191e686f25582ec3c0afceff57
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Wed Mar 18 22:43:18 2020 +0100

    STREAMPIPES-101 Error in Datalake sink if upper case letters are included in runtime names
---
 .../sinks/databases/jvm/influxdb/InfluxDb.java     |  4 ++
 .../databases/jvm/influxdb/InfluxDbClient.java     | 16 ++---
 .../databases/jvm/influxdb/InfluxDbController.java |  1 +
 .../sinks/internal/jvm/datalake/DataLake.java      | 12 +++-
 .../internal/jvm/datalake/DataLakeController.java  |  1 +
 ...uxDbClient.java => DataLakeInfluxDbClient.java} | 71 +++++++++-------------
 6 files changed, 52 insertions(+), 53 deletions(-)

diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/influxdb/InfluxDb.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/influxdb/InfluxDb.java
index 4eca4d7..560aa8b 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/influxdb/InfluxDb.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/influxdb/InfluxDb.java
@@ -61,4 +61,8 @@ public class InfluxDb implements EventSink<InfluxDbParameters> {
   public void onDetach() throws SpRuntimeException {
     influxDbClient.stop();
   }
+
+  public static String prepareString(String s) {
+    return s.toLowerCase().replaceAll("[^a-zA-Z0-9]", "");
+  }
 }
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/influxdb/InfluxDbClient.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/influxdb/InfluxDbClient.java
index 2721726..b0e8730 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/influxdb/InfluxDbClient.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/influxdb/InfluxDbClient.java
@@ -64,7 +64,7 @@ public class InfluxDbClient {
 		this.measureName = measureName;
 		this.user = user;
 		this.password = password;
-		this.timestampField = timestampField;
+		this.timestampField = InfluxDb.prepareString(timestampField);
 		this.batchSize = batchSize;
 		this.flushDuration = flushDuration;
 		this.logger = logger;
@@ -166,20 +166,16 @@ public class InfluxDbClient {
 		Long timestampValue = event.getFieldBySelector(timestampField).getAsPrimitive().getAsLong();
     Point.Builder p = Point.measurement(measureName).time(timestampValue, TimeUnit.MILLISECONDS);
 		for (Map.Entry<String, Object> pair : event.getRaw().entrySet()) {
-      if(!pair.getKey().matches("^[a-zA-Z_][a-zA-Z0-9_]*$")) {
-        throw new SpRuntimeException("Column name '" + pair.getKey() + "' not allowed "
-            + "(allowed: '^[a-zA-Z_][a-zA-Z0-9_]*$')");
-      }
       if (pair.getValue() instanceof Integer) {
-        p.addField(pair.getKey(), (Integer)pair.getValue());
+        p.addField(InfluxDb.prepareString(pair.getKey()), (Integer)pair.getValue());
       } else if (pair.getValue() instanceof Long) {
-        p.addField(pair.getKey(), (Long)pair.getValue());
+        p.addField(InfluxDb.prepareString(pair.getKey()), (Long)pair.getValue());
       } else if (pair.getValue() instanceof Double) {
-        p.addField(pair.getKey(), (Double)pair.getValue());
+        p.addField(InfluxDb.prepareString(pair.getKey()), (Double)pair.getValue());
       } else if (pair.getValue() instanceof Boolean) {
-        p.addField(pair.getKey(), (Boolean)pair.getValue());
+        p.addField(InfluxDb.prepareString(pair.getKey()), (Boolean)pair.getValue());
       } else {
-        p.addField(pair.getKey(), pair.getValue().toString());
+        p.addField(InfluxDb.prepareString(pair.getKey()), pair.getValue().toString());
       }
     }
 
diff --git a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/influxdb/InfluxDbController.java b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/influxdb/InfluxDbController.java
index 3f098f5..ee8943e 100644
--- a/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/influxdb/InfluxDbController.java
+++ b/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/influxdb/InfluxDbController.java
@@ -74,6 +74,7 @@ public class InfluxDbController extends StandaloneEventSinkDeclarer<InfluxDbPara
     Integer port = extractor.singleValueParameter(DATABASE_PORT_KEY, Integer.class);
     String dbName = extractor.singleValueParameter(DATABASE_NAME_KEY, String.class);
     String measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class);
+    measureName = InfluxDb.prepareString(measureName);
     String user = extractor.singleValueParameter(DATABASE_USER_KEY, String.class);
     String password = extractor.secretValue(DATABASE_PASSWORD_KEY);
     String timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
index b622256..46e47c4 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
@@ -46,7 +46,7 @@ import java.util.stream.Collectors;
 public class DataLake implements EventSink<DataLakeParameters> {
 
 
-  private InfluxDbClient influxDbClient;
+  private DataLakeInfluxDbClient influxDbClient;
 
   private static Logger LOG;
 
@@ -62,7 +62,7 @@ public class DataLake implements EventSink<DataLakeParameters> {
 
     this.timestampField = parameters.getTimestampField();
 
-    this.influxDbClient = new InfluxDbClient(
+    this.influxDbClient = new DataLakeInfluxDbClient(
             parameters.getInfluxDbHost(),
             parameters.getInfluxDbPort(),
             parameters.getDatabaseName(),
@@ -77,6 +77,10 @@ public class DataLake implements EventSink<DataLakeParameters> {
     );
 
     EventSchema schema = runtimeContext.getInputSchemaInfo().get(0).getEventSchema();
+
+    schema.getEventProperties().stream().forEach(eventProperty -> {
+      eventProperty.setRuntimeName(prepareString(eventProperty.getRuntimeName()));
+    });
     registerAtDataLake(parameters.getMeasurementName(), schema);
 
     imageProperties = schema.getEventProperties().stream()
@@ -154,4 +158,8 @@ public class DataLake implements EventSink<DataLakeParameters> {
     }
 
   }
+
+  public static String prepareString(String s) {
+    return s.toLowerCase().replaceAll(" ", "_");
+  }
 }
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
index 744dfd9..aceb897 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
@@ -61,6 +61,7 @@ public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakePara
 
 
     String measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class);
+    measureName = DataLake.prepareString(measureName);
     String timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
 
     String hostname = SinksInternalJvmConfig.INSTANCE.getDataLakeProtocol() + "://" + SinksInternalJvmConfig.INSTANCE.getDataLakeHost();
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbClient.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
similarity index 77%
rename from streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbClient.java
rename to streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
index 7d33891..d8fe89b 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbClient.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * Code is the same as InfluxDB (org.apache.streampipes.sinks.databases.jvm.influxdb) sink. Changes applied here should also be applied in the InfluxDB sink
  */
-public class InfluxDbClient {
+public class DataLakeInfluxDbClient {
 	private Integer influxDbPort;
 	private String influxDbHost;
 	private String databaseName;
@@ -53,17 +53,17 @@ public class InfluxDbClient {
 
 	private InfluxDB influxDb = null;
 
-	InfluxDbClient(String influxDbHost,
-			Integer influxDbPort,
-			String databaseName,
-			String measureName,
-			String user,
-			String password,
-			String timestampField,
-            Integer batchSize,
-            Integer flushDuration,
-			List<String> tagsFields,
-			Logger logger) throws SpRuntimeException {
+	DataLakeInfluxDbClient(String influxDbHost,
+                         Integer influxDbPort,
+                         String databaseName,
+                         String measureName,
+                         String user,
+                         String password,
+                         String timestampField,
+                         Integer batchSize,
+                         Integer flushDuration,
+                         List<String> tagsFields,
+                         Logger logger) throws SpRuntimeException {
 		this.influxDbHost = influxDbHost;
 		this.influxDbPort = influxDbPort;
 		this.databaseName = databaseName;
@@ -81,7 +81,7 @@ public class InfluxDbClient {
 	}
 
   /**
-   * Checks whether the {@link InfluxDbClient#influxDbHost} is valid
+   * Checks whether the {@link DataLakeInfluxDbClient#influxDbHost} is valid
    *
    * @throws SpRuntimeException If the hostname is not valid
    */
@@ -133,7 +133,7 @@ public class InfluxDbClient {
 
   /**
    * Checks whether the given database exists. Needs a working connection to an InfluxDB Server
-   * ({@link InfluxDbClient#influxDb} needs to be initialized)
+   * ({@link DataLakeInfluxDbClient#influxDb} needs to be initialized)
    *
    * @param dbName The name of the database, the method should look for
    * @return True if the database exists, false otherwise
@@ -172,34 +172,23 @@ public class InfluxDbClient {
 		}
 
 		Long timestampValue = event.getFieldBySelector(timestampField).getAsPrimitive().getAsLong();
-        Point.Builder p = Point.measurement(measureName).time(timestampValue, TimeUnit.MILLISECONDS);
-
-        for (Map.Entry<String, Object> pair : event.getRaw().entrySet()) {
-            if (!pair.getKey().matches("^[a-zA-Z_][a-zA-Z0-9_]*$")) {
-                throw new SpRuntimeException("Column name '" + pair.getKey() + "' not allowed "
-                        + "(allowed: '^[a-zA-Z_][a-zA-Z0-9_]*$')");
-            }
-
-
-            if (tagFields != null && tagFields.stream().anyMatch(tag -> tag.equals(pair.getKey()))) {
-                p.tag(pair.getKey(), pair.getValue().toString());
-            } else {
-                if (pair.getValue() instanceof Integer) {
-                    p.addField(pair.getKey(), (Integer)pair.getValue());
-                } else if (pair.getValue() instanceof Long) {
-                    p.addField(pair.getKey(), (Long)pair.getValue());
-                } else if (pair.getValue() instanceof Double) {
-                    p.addField(pair.getKey(), (Double)pair.getValue());
-                } else if (pair.getValue() instanceof Boolean) {
-                    p.addField(pair.getKey(), (Boolean)pair.getValue());
-                } else {
-                    p.addField(pair.getKey(), pair.getValue().toString());
-                }
-            }
-
-    }
+    Point.Builder p = Point.measurement(measureName).time(timestampValue, TimeUnit.MILLISECONDS);
+
+      for (Map.Entry<String, Object> pair : event.getRaw().entrySet()) {
+          if (pair.getValue() instanceof Integer) {
+              p.addField(DataLake.prepareString(pair.getKey()), (Integer)pair.getValue());
+          } else if (pair.getValue() instanceof Long) {
+              p.addField(DataLake.prepareString(pair.getKey()), (Long)pair.getValue());
+          } else if (pair.getValue() instanceof Double) {
+              p.addField(DataLake.prepareString(pair.getKey()), (Double)pair.getValue());
+          } else if (pair.getValue() instanceof Boolean) {
+              p.addField(DataLake.prepareString(pair.getKey()), (Boolean)pair.getValue());
+          } else {
+              p.addField(DataLake.prepareString(pair.getKey()), pair.getValue().toString());
+          }
+      }
 
-    influxDb.write(p.build());
+      influxDb.write(p.build());
 	}
 
   /**