You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/17 06:24:06 UTC

[incubator-streampipes] branch STREAMPIPES-577 updated: [STREAMPIPES-577] Let JSON parser support adapter preview

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-577
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/STREAMPIPES-577 by this push:
     new 8aca01778 [STREAMPIPES-577] Let JSON parser support adapter preview
8aca01778 is described below

commit 8aca0177845b5f5175dab1d20189e371fe2b05d2
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Aug 17 08:23:54 2022 +0200

    [STREAMPIPES-577] Let JSON parser support adapter preview
---
 .../apache/streampipes/connect/api/IParser.java    |   9 +
 .../connect/api/exception/ParseException.java      |   6 +
 .../format/json/object/JsonObjectParser.java       | 135 +++---------
 .../connect/adapter/guess/SchemaGuesser.java       |  10 +-
 .../machine/MachineDataSimulatorUtils.java         | 240 +++++++++++----------
 .../connect/iiot/protocol/set/FileProtocol.java    |   2 +-
 .../connect/iiot/protocol/set/HttpProtocol.java    |   2 +-
 .../iiot/protocol/stream/BrokerProtocol.java       |   8 +-
 .../iiot/protocol/stream/FileStreamProtocol.java   |   2 +-
 .../iiot/protocol/stream/HttpStreamProtocol.java   |   2 +-
 .../model/connect/guess/AdapterGuessInfo.java      |  43 ++--
 .../sdk/builder/adapter/GuessSchemaBuilder.java    |  14 +-
 12 files changed, 230 insertions(+), 243 deletions(-)

diff --git a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
index 482bf9fa3..017adb658 100644
--- a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.connect.api;
 
 import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
+import org.apache.streampipes.model.connect.guess.AdapterGuessInfo;
 import org.apache.streampipes.model.schema.EventSchema;
 
 import java.io.InputStream;
@@ -39,4 +40,12 @@ public interface IParser {
    * @return
    */
   EventSchema getEventSchema(List<byte[]> oneEvent);
+
+  default boolean supportsPreview() {
+    return false;
+  }
+
+  default AdapterGuessInfo getSchemaAndSample(List<byte[]> eventSample) throws ParseException {
+    throw new RuntimeException("Not yet implemented!");
+  }
 }
diff --git a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/ParseException.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/ParseException.java
index 2064f2903..37c3ed376 100644
--- a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/ParseException.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/exception/ParseException.java
@@ -19,10 +19,16 @@
 package org.apache.streampipes.connect.api.exception;
 
 public class ParseException extends RuntimeException {
+
     public ParseException() {}
 
     public ParseException(String message)
     {
         super(message);
     }
+
+    public ParseException(String message,
+                          Throwable throwable) {
+        super(message, throwable);
+    }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParser.java
index 3419bc90f..06c5cf5f2 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/json/object/JsonObjectParser.java
@@ -20,24 +20,26 @@ package org.apache.streampipes.connect.adapter.format.json.object;
 
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.connect.api.EmitBinaryEvent;
-import org.apache.streampipes.connect.adapter.model.generic.Parser;
 import org.apache.streampipes.connect.adapter.format.util.JsonEventProperty;
+import org.apache.streampipes.connect.adapter.model.generic.Parser;
+import org.apache.streampipes.connect.api.EmitBinaryEvent;
 import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
+import org.apache.streampipes.model.connect.guess.AdapterGuessInfo;
+import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class JsonObjectParser extends Parser {
 
@@ -75,9 +77,17 @@ public class JsonObjectParser extends Parser {
 
     @Override
     public EventSchema getEventSchema(List<byte[]> oneEvent) {
-        EventSchema resultSchema = new EventSchema();
+        return getSchemaAndSample(oneEvent).getEventSchema();
+    }
+
+    @Override
+    public boolean supportsPreview() {
+        return true;
+    }
 
-//        resultSchema.setEventProperties(Arrays.asList(EpProperties.timestampProperty("timestamp")));
+    @Override
+    public AdapterGuessInfo getSchemaAndSample(List<byte[]> eventSample) throws ParseException {
+        EventSchema resultSchema = new EventSchema();
 
         JsonDataFormatDefinition jsonDefinition = new JsonDataFormatDefinition();
 
@@ -85,107 +95,22 @@ public class JsonObjectParser extends Parser {
         Map<String, Object> exampleEvent = null;
 
         try {
-            exampleEvent = jsonDefinition.toMap(oneEvent.get(0));
-        } catch (SpRuntimeException e) {
-            e.printStackTrace();
-        }
-
-        for (Map.Entry<String, Object> entry : exampleEvent.entrySet())
-        {
-//            System.out.println(entry.getKey() + "/" + entry.getValue());
-            EventProperty p = JsonEventProperty.getEventProperty(entry.getKey(), entry.getValue());
+            exampleEvent = jsonDefinition.toMap(eventSample.get(0));
+            var sample = exampleEvent
+              .entrySet()
+              .stream()
+              .collect(Collectors.toMap(Map.Entry::getKey, e ->
+                new GuessTypeInfo(e.getValue().getClass().getCanonicalName(), e.getValue())));
 
-            resultSchema.addEventProperty(p);
+            for (Map.Entry<String, Object> entry : exampleEvent.entrySet()) {
+                EventProperty p = JsonEventProperty.getEventProperty(entry.getKey(), entry.getValue());
 
-        }
-
-        return resultSchema;
-    }
-
-    public Map<String, Object> parseObject(javax.json.stream.JsonParser jsonParser, boolean root, int start) {
-        // this variable is needed to skip the first object start
-        String mapKey = "";
-        Map<String, Object> result = new HashMap<>();
-        List<Object> arr = null;
-
-        while (jsonParser.hasNext()) {
-            javax.json.stream.JsonParser.Event event = jsonParser.next();
-            switch (event) {
-                case KEY_NAME:
-                    mapKey = jsonParser.getString();
-                    logger.debug("key: " + mapKey );
-                    break;
-                case START_OBJECT:
-                    if (start == 0) {
-                        Map<String, Object> ob = parseObject(jsonParser, false, 0);
-                        if (arr == null) {
-                            result.put(mapKey, ob);
-                        } else {
-                            arr.add(ob);
-                        }
-                    } else {
-                        start--;
-                    }
-                    logger.debug("start object");
-                    break;
-                case END_OBJECT:
-
-                    logger.debug("end object");
-                    return result;
-                case START_ARRAY:
-                    arr = new ArrayList<>();
-                    logger.debug("start array");
-                    break;
-                case END_ARRAY:
-                    // Check if just the end of array is entered
-                    if (result.keySet().size() == 0 && mapKey.equals("")) {
-                        return null;
-                    }
-                    result.put(mapKey, arr);
-                    arr = null;
-                    logger.debug("end array");
-                    break;
-                case VALUE_TRUE:
-                    if (arr == null) {
-                        result.put(mapKey, true);
-                    } else {
-                        arr.add(true);
-                    }
-                    logger.debug("value: true");
-                    break;
-                case VALUE_FALSE:
-                    if (arr == null) {
-                        result.put(mapKey, false);
-                    } else {
-                        arr.add(false);
-                    }
-                    logger.debug("value: false");
-                    break;
-                case VALUE_STRING:
-                    if (arr == null) {
-                        result.put(mapKey, jsonParser.getString());
-                    } else {
-                        arr.add(jsonParser.getString());
-                    }
-                    logger.debug("value string: " + jsonParser.getString());
-                    break;
-                case VALUE_NUMBER:
-                    if (arr == null) {
-                        result.put(mapKey, jsonParser.getBigDecimal());
-                    } else {
-                        arr.add(jsonParser.getBigDecimal());
-                    }
-                    logger.debug("value number: " + jsonParser.getBigDecimal());
-                    break;
-                case VALUE_NULL:
-                    logger.debug("value null");
-                    break;
-                default:
-                    logger.error("Error: " + event + " event is not handled in the JSON parser");
-                    break;
+                resultSchema.addEventProperty(p);
             }
-        }
 
-        return result;
+            return new AdapterGuessInfo(resultSchema, sample);
+        } catch (SpRuntimeException e) {
+            throw new ParseException("Could not serialize event, did you choose the correct format?", e);
+        }
     }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/guess/SchemaGuesser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/guess/SchemaGuesser.java
index 53be8abb2..29f534e97 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/guess/SchemaGuesser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/guess/SchemaGuesser.java
@@ -18,16 +18,24 @@
 
 package org.apache.streampipes.connect.adapter.guess;
 
+import org.apache.streampipes.model.connect.guess.AdapterGuessInfo;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.schema.EventSchema;
 
 public class SchemaGuesser {
 
-    public static GuessSchema guessSchma(EventSchema eventSchema) {
+    public static GuessSchema guessSchema(EventSchema eventSchema) {
         GuessSchema result = new GuessSchema();
 
         result.setEventSchema(eventSchema);
 
         return result;
     }
+
+    public static GuessSchema guessSchema(AdapterGuessInfo guessInfo) {
+        var result = guessSchema(guessInfo.getEventSchema());
+        result.setEventPreview(guessInfo.getEventPreview());
+
+        return result;
+    }
 }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/simulator/machine/MachineDataSimulatorUtils.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/simulator/machine/MachineDataSimulatorUtils.java
index 0d7b2af3d..f157babca 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/simulator/machine/MachineDataSimulatorUtils.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/simulator/machine/MachineDataSimulatorUtils.java
@@ -30,123 +30,137 @@ import static org.apache.streampipes.sdk.helpers.EpProperties.*;
 
 public class MachineDataSimulatorUtils {
 
-    // Vocabulary
-    public static final String NS = "https://streampipes.org/vocabulary/examples/watertank/v1/";
-    public static final String HAS_SENSOR_ID = NS + "hasSensorId";
+  // Vocabulary
+  public static final String NS = "https://streampipes.org/vocabulary/examples/watertank/v1/";
+  public static final String HAS_SENSOR_ID = NS + "hasSensorId";
 
-    private static final String TIMESTAMP = "timestamp";
-    private static final String SENSOR_ID = "sensorId";
-    private static final String MASS_FLOW = "mass_flow";
-    private static final String TEMPERATURE = "temperature";
+  private static final String TIMESTAMP = "timestamp";
+  private static final String SENSOR_ID = "sensorId";
+  private static final String MASS_FLOW = "mass_flow";
+  private static final String TEMPERATURE = "temperature";
 
-    public static GuessSchema getSchema(String selectedSimulatorOption) throws AdapterException {
-        switch(selectedSimulatorOption) {
-            case "flowrate":
-                return getFlowrateSchema();
-            case "pressure":
-                return getPressureSchema();
-            case "waterlevel":
-                return getWaterlevelSchema();
-            default:
-                throw new AdapterException("resource not found");
-        }
+  public static GuessSchema getSchema(String selectedSimulatorOption) throws AdapterException {
+    switch (selectedSimulatorOption) {
+      case "flowrate":
+        return getFlowrateSchema();
+      case "pressure":
+        return getPressureSchema();
+      case "waterlevel":
+        return getWaterlevelSchema();
+      default:
+        throw new AdapterException("resource not found");
     }
+  }
 
-    private static GuessSchema getWaterlevelSchema() {
-        return GuessSchemaBuilder.create()
-                .property(timestampProperty(TIMESTAMP))
-                .property(PrimitivePropertyBuilder
-                        .create(Datatypes.String, "sensorId")
-                        .label("Sensor ID")
-                        .description("The ID of the sensor")
-                        .domainProperty(HAS_SENSOR_ID)
-                        .scope(PropertyScope.DIMENSION_PROPERTY)
-                        .build())
-                .property(PrimitivePropertyBuilder
-                        .create(Datatypes.Float, "level")
-                        .label("Water Level")
-                        .description("Denotes the current water level in the container")
-                        .domainProperty(SO.Number)
-                        .scope(PropertyScope.MEASUREMENT_PROPERTY)
-                        .build())
-                .property(PrimitivePropertyBuilder
-                        .create(Datatypes.Boolean, "overflow")
-                        .label("Overflow")
-                        .description("Indicates whether the tank overflows")
-                        .domainProperty(SO.Number)
-                        .scope(PropertyScope.MEASUREMENT_PROPERTY)
-                        .build())
-                .build();
-    }
+  private static GuessSchema getWaterlevelSchema() {
+    return GuessSchemaBuilder.create()
+      .property(timestampProperty(TIMESTAMP))
+      .sample(TIMESTAMP, System.currentTimeMillis())
+      .property(PrimitivePropertyBuilder
+        .create(Datatypes.String, "sensorId")
+        .label("Sensor ID")
+        .description("The ID of the sensor")
+        .domainProperty(HAS_SENSOR_ID)
+        .scope(PropertyScope.DIMENSION_PROPERTY)
+        .build())
+      .sample("sensorId", "sensor01")
+      .property(PrimitivePropertyBuilder
+        .create(Datatypes.Float, "level")
+        .label("Water Level")
+        .description("Denotes the current water level in the container")
+        .domainProperty(SO.Number)
+        .scope(PropertyScope.MEASUREMENT_PROPERTY)
+        .build())
+      .sample("level", 5.25f)
+      .property(PrimitivePropertyBuilder
+        .create(Datatypes.Boolean, "overflow")
+        .label("Overflow")
+        .description("Indicates whether the tank overflows")
+        .domainProperty(SO.Number)
+        .scope(PropertyScope.MEASUREMENT_PROPERTY)
+        .build())
+      .sample("overflow", true)
+      .build();
+  }
 
-    private static GuessSchema getPressureSchema() {
-        return GuessSchemaBuilder.create()
-                .property(timestampProperty(TIMESTAMP))
-                .property(PrimitivePropertyBuilder
-                        .create(Datatypes.String, "sensorId")
-                        .label("Sensor ID")
-                        .description("The ID of the sensor")
-                        .domainProperty(HAS_SENSOR_ID)
-                        .scope(PropertyScope.DIMENSION_PROPERTY)
-                        .build())
-                .property(PrimitivePropertyBuilder
-                        .create(Datatypes.Float, "pressure")
-                        .label("Pressure")
-                        .description("Denotes the current pressure in the pressure tank")
-                        .domainProperty(SO.Number)
-                        .valueSpecification(0.0f, 100.0f, 0.5f)
-                        .scope(PropertyScope.MEASUREMENT_PROPERTY)
-                        .build())
-                .build();
-    }
+  private static GuessSchema getPressureSchema() {
+    return GuessSchemaBuilder.create()
+      .property(timestampProperty(TIMESTAMP))
+      .sample(TIMESTAMP, System.currentTimeMillis())
+      .property(PrimitivePropertyBuilder
+        .create(Datatypes.String, "sensorId")
+        .label("Sensor ID")
+        .description("The ID of the sensor")
+        .domainProperty(HAS_SENSOR_ID)
+        .scope(PropertyScope.DIMENSION_PROPERTY)
+        .build())
+      .sample("sensorId", "sensor01")
+      .property(PrimitivePropertyBuilder
+        .create(Datatypes.Float, "pressure")
+        .label("Pressure")
+        .description("Denotes the current pressure in the pressure tank")
+        .domainProperty(SO.Number)
+        .valueSpecification(0.0f, 100.0f, 0.5f)
+        .scope(PropertyScope.MEASUREMENT_PROPERTY)
+        .build())
+      .sample("pressure", 85.22f)
+      .build();
+  }
 
-    public static GuessSchema getFlowrateSchema() {
-        return GuessSchemaBuilder.create()
-                .property(timestampProperty(TIMESTAMP))
-                .property(PrimitivePropertyBuilder
-                        .create(Datatypes.String, SENSOR_ID)
-                        .label("Sensor ID")
-                        .description("The ID of the sensor")
-                        .domainProperty(HAS_SENSOR_ID)
-                        .scope(PropertyScope.DIMENSION_PROPERTY)
-                        .build())
-                .property(PrimitivePropertyBuilder
-                        .create(Datatypes.Float, MASS_FLOW)
-                        .label("Mass Flow")
-                        .description("Denotes the current mass flow in the sensor")
-                        .domainProperty(SO.Number)
-                        .scope(PropertyScope.MEASUREMENT_PROPERTY)
-                        .build())
-                .property(PrimitivePropertyBuilder
-                        .create(Datatypes.Float, "volume_flow")
-                        .label("Volume Flow")
-                        .description("Denotes the current volume flow")
-                        .domainProperty(SO.Number)
-                        .scope(PropertyScope.MEASUREMENT_PROPERTY)
-                        .build())
-                .property(PrimitivePropertyBuilder
-                        .create(Datatypes.Float, TEMPERATURE)
-                        .label("Temperature")
-                        .description("Denotes the current temperature in degrees celsius")
-                        .domainProperty(SO.Number)
-                        .scope(PropertyScope.MEASUREMENT_PROPERTY)
-                        .measurementUnit(URI.create("http://codes.wmo.int/common/unit/degC"))
-                        .valueSpecification(0.0f, 100.0f, 0.1f)
-                        .build())
-                .property(PrimitivePropertyBuilder
-                        .create(Datatypes.Float, "density")
-                        .label("Density")
-                        .description("Denotes the current density of the fluid")
-                        .domainProperty(SO.Number)
-                        .scope(PropertyScope.MEASUREMENT_PROPERTY)
-                        .build())
-                .property(PrimitivePropertyBuilder
-                        .create(Datatypes.Boolean, "sensor_fault_flags")
-                        .label("Sensor Fault Flags")
-                        .description("Any fault flags of the sensors")
-                        .domainProperty(SO.Boolean)
-                        .scope(PropertyScope.MEASUREMENT_PROPERTY)
-                        .build())
-                .build();
-    }
+  public static GuessSchema getFlowrateSchema() {
+    return GuessSchemaBuilder.create()
+      .property(timestampProperty(TIMESTAMP))
+      .sample(TIMESTAMP, System.currentTimeMillis())
+      .property(PrimitivePropertyBuilder
+        .create(Datatypes.String, SENSOR_ID)
+        .label("Sensor ID")
+        .description("The ID of the sensor")
+        .domainProperty(HAS_SENSOR_ID)
+        .scope(PropertyScope.DIMENSION_PROPERTY)
+        .build())
+      .sample("sensorId", "sensor01")
+      .property(PrimitivePropertyBuilder
+        .create(Datatypes.Float, MASS_FLOW)
+        .label("Mass Flow")
+        .description("Denotes the current mass flow in the sensor")
+        .domainProperty(SO.Number)
+        .scope(PropertyScope.MEASUREMENT_PROPERTY)
+        .build())
+      .sample(MASS_FLOW, 5.76f)
+      .property(PrimitivePropertyBuilder
+        .create(Datatypes.Float, "volume_flow")
+        .label("Volume Flow")
+        .description("Denotes the current volume flow")
+        .domainProperty(SO.Number)
+        .scope(PropertyScope.MEASUREMENT_PROPERTY)
+        .build())
+      .sample("volume_flow", 3.34f)
+      .property(PrimitivePropertyBuilder
+        .create(Datatypes.Float, TEMPERATURE)
+        .label("Temperature")
+        .description("Denotes the current temperature in degrees celsius")
+        .domainProperty(SO.Number)
+        .scope(PropertyScope.MEASUREMENT_PROPERTY)
+        .measurementUnit(URI.create("http://codes.wmo.int/common/unit/degC"))
+        .valueSpecification(0.0f, 100.0f, 0.1f)
+        .build())
+      .sample(TEMPERATURE, 33.221f)
+      .property(PrimitivePropertyBuilder
+        .create(Datatypes.Float, "density")
+        .label("Density")
+        .description("Denotes the current density of the fluid")
+        .domainProperty(SO.Number)
+        .scope(PropertyScope.MEASUREMENT_PROPERTY)
+        .build())
+      .sample("density", 5.0f)
+      .property(PrimitivePropertyBuilder
+        .create(Datatypes.Boolean, "sensor_fault_flags")
+        .label("Sensor Fault Flags")
+        .description("Any fault flags of the sensors")
+        .domainProperty(SO.Boolean)
+        .scope(PropertyScope.MEASUREMENT_PROPERTY)
+        .build())
+      .sample("sensor_fault_flags", true)
+      .build();
+  }
 }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java
index 25976de44..31d4fc5bf 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java
@@ -123,7 +123,7 @@ public class FileProtocol extends Protocol {
 
             EventSchema eventSchema = parser.getEventSchema(dataByte);
 
-            GuessSchema result = SchemaGuesser.guessSchma(eventSchema);
+            GuessSchema result = SchemaGuesser.guessSchema(eventSchema);
 
             return result;
         } catch (FileNotFoundException e) {
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/HttpProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/HttpProtocol.java
index 1feb4cceb..eca0f25f7 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/HttpProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/HttpProtocol.java
@@ -116,7 +116,7 @@ public class HttpProtocol extends Protocol {
 
         EventSchema eventSchema= parser.getEventSchema(dataByte);
 
-        GuessSchema result = SchemaGuesser.guessSchma(eventSchema);
+        GuessSchema result = SchemaGuesser.guessSchema(eventSchema);
 
         return result;
     }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/BrokerProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/BrokerProtocol.java
index 835fffc72..e34d7f941 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/BrokerProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/BrokerProtocol.java
@@ -23,7 +23,6 @@ import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.api.IParser;
 import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventSchema;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -48,9 +47,12 @@ public abstract class BrokerProtocol extends Protocol {
   public GuessSchema getGuessSchema() throws ParseException {
 
     List<byte[]> eventByte = getNByteElements(1);
-    EventSchema eventSchema = parser.getEventSchema(eventByte);
 
-    return SchemaGuesser.guessSchma(eventSchema);
+    if (parser.supportsPreview()) {
+      return SchemaGuesser.guessSchema(parser.getSchemaAndSample(eventByte));
+    } else {
+      return SchemaGuesser.guessSchema(parser.getEventSchema(eventByte));
+    }
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 4b9368b0b..d82d28ce9 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -213,7 +213,7 @@ public class FileStreamProtocol extends Protocol {
 
     EventSchema eventSchema = parser.getEventSchema(dataByte);
 
-    GuessSchema result = SchemaGuesser.guessSchma(eventSchema);
+    GuessSchema result = SchemaGuesser.guessSchema(eventSchema);
 
     return result;
   }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
index 39a5dad26..be6882957 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/HttpStreamProtocol.java
@@ -112,7 +112,7 @@ public class HttpStreamProtocol extends PullProtocol {
             dataByte.addAll(dataByte);
         }
         EventSchema eventSchema= parser.getEventSchema(dataByte);
-        GuessSchema result = SchemaGuesser.guessSchma(eventSchema);
+        GuessSchema result = SchemaGuesser.guessSchema(eventSchema);
 
         return result;
     }
diff --git a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/guess/AdapterGuessInfo.java
similarity index 51%
copy from streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/connect/guess/AdapterGuessInfo.java
index 482bf9fa3..6aa137fbd 100644
--- a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IParser.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/guess/AdapterGuessInfo.java
@@ -15,28 +15,41 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.connect.api;
 
-import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.model.connect.grounding.FormatDescription;
+package org.apache.streampipes.model.connect.guess;
+
 import org.apache.streampipes.model.schema.EventSchema;
 
-import java.io.InputStream;
 import java.util.List;
+import java.util.Map;
+
+public class AdapterGuessInfo {
+
+  private EventSchema eventSchema;
+  private List<Map<String, GuessTypeInfo>> eventPreview;
+
+  public AdapterGuessInfo() {
+  }
 
-public interface IParser {
+  public AdapterGuessInfo(EventSchema eventSchema,
+                          Map<String, GuessTypeInfo> singleEventSample) {
+    this.eventSchema = eventSchema;
+    this.eventPreview = List.of(singleEventSample);
+  }
 
-  IParser getInstance(FormatDescription formatDescription);
+  public EventSchema getEventSchema() {
+    return eventSchema;
+  }
 
-  void parse(InputStream data, EmitBinaryEvent emitBinaryEvent) throws ParseException;
+  public void setEventSchema(EventSchema eventSchema) {
+    this.eventSchema = eventSchema;
+  }
 
-  List<byte[]> parseNEvents(InputStream data, int n) throws ParseException;
+  public List<Map<String, GuessTypeInfo>> getEventPreview() {
+    return eventPreview;
+  }
 
-  /**
-   * Pass one event to Parser to get the event schema
-   *
-   * @param oneEvent
-   * @return
-   */
-  EventSchema getEventSchema(List<byte[]> oneEvent);
+  public void setEventPreview(List<Map<String, GuessTypeInfo>> eventPreview) {
+    this.eventPreview = eventPreview;
+  }
 }
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/GuessSchemaBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/GuessSchemaBuilder.java
index 91e0da129..8f88a54eb 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/GuessSchemaBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/GuessSchemaBuilder.java
@@ -18,18 +18,20 @@
 package org.apache.streampipes.sdk.builder.adapter;
 
 import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventSchema;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 
 public class GuessSchemaBuilder {
 
   private List<EventProperty> eventProperties;
+  private Map<String, GuessTypeInfo> samples;
 
   private GuessSchemaBuilder() {
     this.eventProperties = new ArrayList<>();
+    this.samples = new HashMap<>();
   }
 
   /**
@@ -39,6 +41,13 @@ public class GuessSchemaBuilder {
     return new GuessSchemaBuilder();
   }
 
+  public GuessSchemaBuilder sample(String runtimeName,
+                                   Object sampleValue) {
+    this.samples.put(runtimeName, new GuessTypeInfo(sampleValue.getClass().getCanonicalName(), sampleValue));
+
+    return this;
+  }
+
   public GuessSchemaBuilder property(EventProperty property) {
     this.eventProperties.add(property);
 
@@ -56,6 +65,7 @@ public class GuessSchemaBuilder {
     eventSchema.setEventProperties(eventProperties);
 
     guessSchema.setEventSchema(eventSchema);
+    guessSchema.setEventPreview(List.of(this.samples));
 
     return guessSchema;
   }