You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/18 20:33:12 UTC
[incubator-streampipes] 06/15: [STREAMPIPES-577] Let JSON parser support adapter preview
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.70.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit dcdbe8abb21d56971f2e835edaff3c597ea79c68
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Aug 17 08:23:54 2022 +0200
[STREAMPIPES-577] Let JSON parser support adapter preview
---
.../apache/streampipes/connect/api/IParser.java | 9 +
.../connect/api/exception/ParseException.java | 6 +
.../format/json/object/JsonObjectParser.java | 135 +++---------
.../connect/adapter/guess/SchemaGuesser.java | 10 +-
.../machine/MachineDataSimulatorUtils.java | 240 +++++++++++----------
.../connect/iiot/protocol/set/FileProtocol.java | 2 +-
.../connect/iiot/protocol/set/HttpProtocol.java | 2 +-
.../iiot/protocol/stream/BrokerProtocol.java | 8 +-
.../iiot/protocol/stream/FileStreamProtocol.java | 2 +-
.../iiot/protocol/stream/HttpStreamProtocol.java | 2 +-
.../model/connect/guess/AdapterGuessInfo.java | 43 ++--
.../sdk/builder/adapter/GuessSchemaBuilder.java | 14 +-
12 files changed, 230 insertions(+), 243 deletions(-)
diff --git a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
index 482bf9fa3..017adb658 100644
--- a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.connect.api;
import org.apache.streampipes.connect.api.exception.ParseException;
import org.apache.streampipes.model.connect.grounding.FormatDescription;
+import org.apache.streampipes.model.connect.guess.AdapterGuessInfo;
import org.apache.streampipes.model.schema.EventSchema;
import java.io.InputStream;
@@ -39,4 +40,12 @@ public interface IParser {
* @return
*/
EventSchema getEventSchema(List<byte[]> oneEvent);
+
+ default boolean supportsPreview() {
+ return false;
+ }
+
+ default AdapterGuessInfo getSchemaAndSample(List<byte[]> eventSample) throws ParseException {
+ throw new RuntimeException("Not yet implemented!");
+ }
}
diff --git a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/ParseException.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/ParseException.java
index 2064f2903..37c3ed376 100644
--- a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/ParseException.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/ParseException.java
@@ -19,10 +19,16 @@
package org.apache.streampipes.connect.api.exception;
public class ParseException extends RuntimeException {
+
public ParseException() {}
public ParseException(String message)
{
super(message);
}
+
+ public ParseException(String message,
+ Throwable throwable) {
+ super(message, throwable);
+ }
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParser.java
index 3419bc90f..06c5cf5f2 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParser.java
@@ -20,24 +20,26 @@ package org.apache.streampipes.connect.adapter.format.json.object;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.connect.api.EmitBinaryEvent;
-import org.apache.streampipes.connect.adapter.model.generic.Parser;
import org.apache.streampipes.connect.adapter.format.util.JsonEventProperty;
+import org.apache.streampipes.connect.adapter.model.generic.Parser;
+import org.apache.streampipes.connect.api.EmitBinaryEvent;
import org.apache.streampipes.connect.api.exception.ParseException;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.model.connect.grounding.FormatDescription;
+import org.apache.streampipes.model.connect.guess.AdapterGuessInfo;
+import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class JsonObjectParser extends Parser {
@@ -75,9 +77,17 @@ public class JsonObjectParser extends Parser {
@Override
public EventSchema getEventSchema(List<byte[]> oneEvent) {
- EventSchema resultSchema = new EventSchema();
+ return getSchemaAndSample(oneEvent).getEventSchema();
+ }
+
+ @Override
+ public boolean supportsPreview() {
+ return true;
+ }
-// resultSchema.setEventProperties(Arrays.asList(EpProperties.timestampProperty("timestamp")));
+ @Override
+ public AdapterGuessInfo getSchemaAndSample(List<byte[]> eventSample) throws ParseException {
+ EventSchema resultSchema = new EventSchema();
JsonDataFormatDefinition jsonDefinition = new JsonDataFormatDefinition();
@@ -85,107 +95,22 @@ public class JsonObjectParser extends Parser {
Map<String, Object> exampleEvent = null;
try {
- exampleEvent = jsonDefinition.toMap(oneEvent.get(0));
- } catch (SpRuntimeException e) {
- e.printStackTrace();
- }
-
- for (Map.Entry<String, Object> entry : exampleEvent.entrySet())
- {
-// System.out.println(entry.getKey() + "/" + entry.getValue());
- EventProperty p = JsonEventProperty.getEventProperty(entry.getKey(), entry.getValue());
+ exampleEvent = jsonDefinition.toMap(eventSample.get(0));
+ var sample = exampleEvent
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
+ new GuessTypeInfo(e.getValue().getClass().getCanonicalName(), e.getValue())));
- resultSchema.addEventProperty(p);
+ for (Map.Entry<String, Object> entry : exampleEvent.entrySet()) {
+ EventProperty p = JsonEventProperty.getEventProperty(entry.getKey(), entry.getValue());
- }
-
- return resultSchema;
- }
-
- public Map<String, Object> parseObject(javax.json.stream.JsonParser jsonParser, boolean root, int start) {
- // this variable is needed to skip the first object start
- String mapKey = "";
- Map<String, Object> result = new HashMap<>();
- List<Object> arr = null;
-
- while (jsonParser.hasNext()) {
- javax.json.stream.JsonParser.Event event = jsonParser.next();
- switch (event) {
- case KEY_NAME:
- mapKey = jsonParser.getString();
- logger.debug("key: " + mapKey );
- break;
- case START_OBJECT:
- if (start == 0) {
- Map<String, Object> ob = parseObject(jsonParser, false, 0);
- if (arr == null) {
- result.put(mapKey, ob);
- } else {
- arr.add(ob);
- }
- } else {
- start--;
- }
- logger.debug("start object");
- break;
- case END_OBJECT:
-
- logger.debug("end object");
- return result;
- case START_ARRAY:
- arr = new ArrayList<>();
- logger.debug("start array");
- break;
- case END_ARRAY:
- // Check if just the end of array is entered
- if (result.keySet().size() == 0 && mapKey.equals("")) {
- return null;
- }
- result.put(mapKey, arr);
- arr = null;
- logger.debug("end array");
- break;
- case VALUE_TRUE:
- if (arr == null) {
- result.put(mapKey, true);
- } else {
- arr.add(true);
- }
- logger.debug("value: true");
- break;
- case VALUE_FALSE:
- if (arr == null) {
- result.put(mapKey, false);
- } else {
- arr.add(false);
- }
- logger.debug("value: false");
- break;
- case VALUE_STRING:
- if (arr == null) {
- result.put(mapKey, jsonParser.getString());
- } else {
- arr.add(jsonParser.getString());
- }
- logger.debug("value string: " + jsonParser.getString());
- break;
- case VALUE_NUMBER:
- if (arr == null) {
- result.put(mapKey, jsonParser.getBigDecimal());
- } else {
- arr.add(jsonParser.getBigDecimal());
- }
- logger.debug("value number: " + jsonParser.getBigDecimal());
- break;
- case VALUE_NULL:
- logger.debug("value null");
- break;
- default:
- logger.error("Error: " + event + " event is not handled in the JSON parser");
- break;
+ resultSchema.addEventProperty(p);
}
- }
- return result;
+ return new AdapterGuessInfo(resultSchema, sample);
+ } catch (SpRuntimeException e) {
+ throw new ParseException("Could not serialize event, did you choose the correct format?", e);
+ }
}
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/guess/SchemaGuesser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/guess/SchemaGuesser.java
index 53be8abb2..29f534e97 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/guess/SchemaGuesser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/guess/SchemaGuesser.java
@@ -18,16 +18,24 @@
package org.apache.streampipes.connect.adapter.guess;
+import org.apache.streampipes.model.connect.guess.AdapterGuessInfo;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.schema.EventSchema;
public class SchemaGuesser {
- public static GuessSchema guessSchma(EventSchema eventSchema) {
+ public static GuessSchema guessSchema(EventSchema eventSchema) {
GuessSchema result = new GuessSchema();
result.setEventSchema(eventSchema);
return result;
}
+
+ public static GuessSchema guessSchema(AdapterGuessInfo guessInfo) {
+ var result = guessSchema(guessInfo.getEventSchema());
+ result.setEventPreview(guessInfo.getEventPreview());
+
+ return result;
+ }
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/simulator/machine/MachineDataSimulatorUtils.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/simulator/machine/MachineDataSimulatorUtils.java
index 0d7b2af3d..f157babca 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/simulator/machine/MachineDataSimulatorUtils.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/simulator/machine/MachineDataSimulatorUtils.java
@@ -30,123 +30,137 @@ import static org.apache.streampipes.sdk.helpers.EpProperties.*;
public class MachineDataSimulatorUtils {
- // Vocabulary
- public static final String NS = "https://streampipes.org/vocabulary/examples/watertank/v1/";
- public static final String HAS_SENSOR_ID = NS + "hasSensorId";
+ // Vocabulary
+ public static final String NS = "https://streampipes.org/vocabulary/examples/watertank/v1/";
+ public static final String HAS_SENSOR_ID = NS + "hasSensorId";
- private static final String TIMESTAMP = "timestamp";
- private static final String SENSOR_ID = "sensorId";
- private static final String MASS_FLOW = "mass_flow";
- private static final String TEMPERATURE = "temperature";
+ private static final String TIMESTAMP = "timestamp";
+ private static final String SENSOR_ID = "sensorId";
+ private static final String MASS_FLOW = "mass_flow";
+ private static final String TEMPERATURE = "temperature";
- public static GuessSchema getSchema(String selectedSimulatorOption) throws AdapterException {
- switch(selectedSimulatorOption) {
- case "flowrate":
- return getFlowrateSchema();
- case "pressure":
- return getPressureSchema();
- case "waterlevel":
- return getWaterlevelSchema();
- default:
- throw new AdapterException("resource not found");
- }
+ public static GuessSchema getSchema(String selectedSimulatorOption) throws AdapterException {
+ switch (selectedSimulatorOption) {
+ case "flowrate":
+ return getFlowrateSchema();
+ case "pressure":
+ return getPressureSchema();
+ case "waterlevel":
+ return getWaterlevelSchema();
+ default:
+ throw new AdapterException("resource not found");
}
+ }
- private static GuessSchema getWaterlevelSchema() {
- return GuessSchemaBuilder.create()
- .property(timestampProperty(TIMESTAMP))
- .property(PrimitivePropertyBuilder
- .create(Datatypes.String, "sensorId")
- .label("Sensor ID")
- .description("The ID of the sensor")
- .domainProperty(HAS_SENSOR_ID)
- .scope(PropertyScope.DIMENSION_PROPERTY)
- .build())
- .property(PrimitivePropertyBuilder
- .create(Datatypes.Float, "level")
- .label("Water Level")
- .description("Denotes the current water level in the container")
- .domainProperty(SO.Number)
- .scope(PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .property(PrimitivePropertyBuilder
- .create(Datatypes.Boolean, "overflow")
- .label("Overflow")
- .description("Indicates whether the tank overflows")
- .domainProperty(SO.Number)
- .scope(PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .build();
- }
+ private static GuessSchema getWaterlevelSchema() {
+ return GuessSchemaBuilder.create()
+ .property(timestampProperty(TIMESTAMP))
+ .sample(TIMESTAMP, System.currentTimeMillis())
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.String, "sensorId")
+ .label("Sensor ID")
+ .description("The ID of the sensor")
+ .domainProperty(HAS_SENSOR_ID)
+ .scope(PropertyScope.DIMENSION_PROPERTY)
+ .build())
+ .sample("sensorId", "sensor01")
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.Float, "level")
+ .label("Water Level")
+ .description("Denotes the current water level in the container")
+ .domainProperty(SO.Number)
+ .scope(PropertyScope.MEASUREMENT_PROPERTY)
+ .build())
+ .sample("level", 5.25f)
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.Boolean, "overflow")
+ .label("Overflow")
+ .description("Indicates whether the tank overflows")
+ .domainProperty(SO.Number)
+ .scope(PropertyScope.MEASUREMENT_PROPERTY)
+ .build())
+ .sample("overflow", true)
+ .build();
+ }
- private static GuessSchema getPressureSchema() {
- return GuessSchemaBuilder.create()
- .property(timestampProperty(TIMESTAMP))
- .property(PrimitivePropertyBuilder
- .create(Datatypes.String, "sensorId")
- .label("Sensor ID")
- .description("The ID of the sensor")
- .domainProperty(HAS_SENSOR_ID)
- .scope(PropertyScope.DIMENSION_PROPERTY)
- .build())
- .property(PrimitivePropertyBuilder
- .create(Datatypes.Float, "pressure")
- .label("Pressure")
- .description("Denotes the current pressure in the pressure tank")
- .domainProperty(SO.Number)
- .valueSpecification(0.0f, 100.0f, 0.5f)
- .scope(PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .build();
- }
+ private static GuessSchema getPressureSchema() {
+ return GuessSchemaBuilder.create()
+ .property(timestampProperty(TIMESTAMP))
+ .sample(TIMESTAMP, System.currentTimeMillis())
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.String, "sensorId")
+ .label("Sensor ID")
+ .description("The ID of the sensor")
+ .domainProperty(HAS_SENSOR_ID)
+ .scope(PropertyScope.DIMENSION_PROPERTY)
+ .build())
+ .sample("sensorId", "sensor01")
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.Float, "pressure")
+ .label("Pressure")
+ .description("Denotes the current pressure in the pressure tank")
+ .domainProperty(SO.Number)
+ .valueSpecification(0.0f, 100.0f, 0.5f)
+ .scope(PropertyScope.MEASUREMENT_PROPERTY)
+ .build())
+ .sample("pressure", 85.22f)
+ .build();
+ }
- public static GuessSchema getFlowrateSchema() {
- return GuessSchemaBuilder.create()
- .property(timestampProperty(TIMESTAMP))
- .property(PrimitivePropertyBuilder
- .create(Datatypes.String, SENSOR_ID)
- .label("Sensor ID")
- .description("The ID of the sensor")
- .domainProperty(HAS_SENSOR_ID)
- .scope(PropertyScope.DIMENSION_PROPERTY)
- .build())
- .property(PrimitivePropertyBuilder
- .create(Datatypes.Float, MASS_FLOW)
- .label("Mass Flow")
- .description("Denotes the current mass flow in the sensor")
- .domainProperty(SO.Number)
- .scope(PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .property(PrimitivePropertyBuilder
- .create(Datatypes.Float, "volume_flow")
- .label("Volume Flow")
- .description("Denotes the current volume flow")
- .domainProperty(SO.Number)
- .scope(PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .property(PrimitivePropertyBuilder
- .create(Datatypes.Float, TEMPERATURE)
- .label("Temperature")
- .description("Denotes the current temperature in degrees celsius")
- .domainProperty(SO.Number)
- .scope(PropertyScope.MEASUREMENT_PROPERTY)
- .measurementUnit(URI.create("http://codes.wmo.int/common/unit/degC"))
- .valueSpecification(0.0f, 100.0f, 0.1f)
- .build())
- .property(PrimitivePropertyBuilder
- .create(Datatypes.Float, "density")
- .label("Density")
- .description("Denotes the current density of the fluid")
- .domainProperty(SO.Number)
- .scope(PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .property(PrimitivePropertyBuilder
- .create(Datatypes.Boolean, "sensor_fault_flags")
- .label("Sensor Fault Flags")
- .description("Any fault flags of the sensors")
- .domainProperty(SO.Boolean)
- .scope(PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .build();
- }
+ public static GuessSchema getFlowrateSchema() {
+ return GuessSchemaBuilder.create()
+ .property(timestampProperty(TIMESTAMP))
+ .sample(TIMESTAMP, System.currentTimeMillis())
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.String, SENSOR_ID)
+ .label("Sensor ID")
+ .description("The ID of the sensor")
+ .domainProperty(HAS_SENSOR_ID)
+ .scope(PropertyScope.DIMENSION_PROPERTY)
+ .build())
+ .sample("sensorId", "sensor01")
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.Float, MASS_FLOW)
+ .label("Mass Flow")
+ .description("Denotes the current mass flow in the sensor")
+ .domainProperty(SO.Number)
+ .scope(PropertyScope.MEASUREMENT_PROPERTY)
+ .build())
+ .sample(MASS_FLOW, 5.76f)
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.Float, "volume_flow")
+ .label("Volume Flow")
+ .description("Denotes the current volume flow")
+ .domainProperty(SO.Number)
+ .scope(PropertyScope.MEASUREMENT_PROPERTY)
+ .build())
+ .sample("volume_flow", 3.34f)
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.Float, TEMPERATURE)
+ .label("Temperature")
+ .description("Denotes the current temperature in degrees celsius")
+ .domainProperty(SO.Number)
+ .scope(PropertyScope.MEASUREMENT_PROPERTY)
+ .measurementUnit(URI.create("http://codes.wmo.int/common/unit/degC"))
+ .valueSpecification(0.0f, 100.0f, 0.1f)
+ .build())
+ .sample(TEMPERATURE, 33.221f)
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.Float, "density")
+ .label("Density")
+ .description("Denotes the current density of the fluid")
+ .domainProperty(SO.Number)
+ .scope(PropertyScope.MEASUREMENT_PROPERTY)
+ .build())
+ .sample("density", 5.0f)
+ .property(PrimitivePropertyBuilder
+ .create(Datatypes.Boolean, "sensor_fault_flags")
+ .label("Sensor Fault Flags")
+ .description("Any fault flags of the sensors")
+ .domainProperty(SO.Boolean)
+ .scope(PropertyScope.MEASUREMENT_PROPERTY)
+ .build())
+ .sample("sensor_fault_flags", true)
+ .build();
+ }
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java
index 25976de44..31d4fc5bf 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java
@@ -123,7 +123,7 @@ public class FileProtocol extends Protocol {
EventSchema eventSchema = parser.getEventSchema(dataByte);
- GuessSchema result = SchemaGuesser.guessSchma(eventSchema);
+ GuessSchema result = SchemaGuesser.guessSchema(eventSchema);
return result;
} catch (FileNotFoundException e) {
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/HttpProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/HttpProtocol.java
index 1feb4cceb..eca0f25f7 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/HttpProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/HttpProtocol.java
@@ -116,7 +116,7 @@ public class HttpProtocol extends Protocol {
EventSchema eventSchema= parser.getEventSchema(dataByte);
- GuessSchema result = SchemaGuesser.guessSchma(eventSchema);
+ GuessSchema result = SchemaGuesser.guessSchema(eventSchema);
return result;
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/BrokerProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/BrokerProtocol.java
index 835fffc72..e34d7f941 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/BrokerProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/BrokerProtocol.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.connect.api.IFormat;
import org.apache.streampipes.connect.api.IParser;
import org.apache.streampipes.connect.api.exception.ParseException;
import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventSchema;
import java.util.ArrayList;
import java.util.List;
@@ -48,9 +47,12 @@ public abstract class BrokerProtocol extends Protocol {
public GuessSchema getGuessSchema() throws ParseException {
List<byte[]> eventByte = getNByteElements(1);
- EventSchema eventSchema = parser.getEventSchema(eventByte);
- return SchemaGuesser.guessSchma(eventSchema);
+ if (parser.supportsPreview()) {
+ return SchemaGuesser.guessSchema(parser.getSchemaAndSample(eventByte));
+ } else {
+ return SchemaGuesser.guessSchema(parser.getEventSchema(eventByte));
+ }
}
@Override
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 4b9368b0b..d82d28ce9 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -213,7 +213,7 @@ public class FileStreamProtocol extends Protocol {
EventSchema eventSchema = parser.getEventSchema(dataByte);
- GuessSchema result = SchemaGuesser.guessSchma(eventSchema);
+ GuessSchema result = SchemaGuesser.guessSchema(eventSchema);
return result;
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
index 39a5dad26..be6882957 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
@@ -112,7 +112,7 @@ public class HttpStreamProtocol extends PullProtocol {
dataByte.addAll(dataByte);
}
EventSchema eventSchema= parser.getEventSchema(dataByte);
- GuessSchema result = SchemaGuesser.guessSchma(eventSchema);
+ GuessSchema result = SchemaGuesser.guessSchema(eventSchema);
return result;
}
diff --git a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/guess/AdapterGuessInfo.java
similarity index 51%
copy from streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/connect/guess/AdapterGuessInfo.java
index 482bf9fa3..6aa137fbd 100644
--- a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/guess/AdapterGuessInfo.java
@@ -15,28 +15,41 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.connect.api;
-import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.model.connect.grounding.FormatDescription;
+package org.apache.streampipes.model.connect.guess;
+
import org.apache.streampipes.model.schema.EventSchema;
-import java.io.InputStream;
import java.util.List;
+import java.util.Map;
+
+public class AdapterGuessInfo {
+
+ private EventSchema eventSchema;
+ private List<Map<String, GuessTypeInfo>> eventPreview;
+
+ public AdapterGuessInfo() {
+ }
-public interface IParser {
+ public AdapterGuessInfo(EventSchema eventSchema,
+ Map<String, GuessTypeInfo> singleEventSample) {
+ this.eventSchema = eventSchema;
+ this.eventPreview = List.of(singleEventSample);
+ }
- IParser getInstance(FormatDescription formatDescription);
+ public EventSchema getEventSchema() {
+ return eventSchema;
+ }
- void parse(InputStream data, EmitBinaryEvent emitBinaryEvent) throws ParseException;
+ public void setEventSchema(EventSchema eventSchema) {
+ this.eventSchema = eventSchema;
+ }
- List<byte[]> parseNEvents(InputStream data, int n) throws ParseException;
+ public List<Map<String, GuessTypeInfo>> getEventPreview() {
+ return eventPreview;
+ }
- /**
- * Pass one event to Parser to get the event schema
- *
- * @param oneEvent
- * @return
- */
- EventSchema getEventSchema(List<byte[]> oneEvent);
+ public void setEventPreview(List<Map<String, GuessTypeInfo>> eventPreview) {
+ this.eventPreview = eventPreview;
+ }
}
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/GuessSchemaBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/GuessSchemaBuilder.java
index 91e0da129..8f88a54eb 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/GuessSchemaBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/GuessSchemaBuilder.java
@@ -18,18 +18,20 @@
package org.apache.streampipes.sdk.builder.adapter;
import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
public class GuessSchemaBuilder {
private List<EventProperty> eventProperties;
+ private Map<String, GuessTypeInfo> samples;
private GuessSchemaBuilder() {
this.eventProperties = new ArrayList<>();
+ this.samples = new HashMap<>();
}
/**
@@ -39,6 +41,13 @@ public class GuessSchemaBuilder {
return new GuessSchemaBuilder();
}
+ public GuessSchemaBuilder sample(String runtimeName,
+ Object sampleValue) {
+ this.samples.put(runtimeName, new GuessTypeInfo(sampleValue.getClass().getCanonicalName(), sampleValue));
+
+ return this;
+ }
+
public GuessSchemaBuilder property(EventProperty property) {
this.eventProperties.add(property);
@@ -56,6 +65,7 @@ public class GuessSchemaBuilder {
eventSchema.setEventProperties(eventProperties);
guessSchema.setEventSchema(eventSchema);
+ guessSchema.setEventPreview(List.of(this.samples));
return guessSchema;
}