You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2018/12/25 16:09:33 UTC

[incubator-plc4x] 02/03: [CALCITE] Running Version.

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/calcite-adapter
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit df1a6dbfb8cd456ea424152f39cbf022325274c8
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sun Dec 23 21:33:47 2018 +0100

    [CALCITE] Running Version.
---
 integrations/apache-calcite/pom.xml                |  63 ++++++-
 .../main/java/org/apache/plc4x/Plc4xBaseTable.java | 193 +++++++++++++++++++++
 .../main/java/org/apache/plc4x/Plc4xSchema.java    |  63 +++++--
 .../java/org/apache/plc4x/Plc4xSchemaFactory.java  |  33 +++-
 .../java/org/apache/plc4x/Plc4xStreamTable.java    |  45 +++++
 .../src/main/java/org/apache/plc4x/Plc4xTable.java |  88 +++-------
 .../java/org/apache/plc4x/DriverManagerTest.java   |  69 +++++++-
 .../java/org/apache/plc4x/Plc4XBaseTableTest.java  |  48 +++++
 .../test/java/org/apache/plc4x/Plc4xTableTest.java |  24 ---
 .../apache-calcite/src/test/resources/example.yml  |   4 +-
 .../apache-calcite/src/test/resources/logback.xml  |  38 ++++
 .../apache-calcite/src/test/resources/model.yml    |  33 ++++
 .../java/scraper/config/JobConfiguration.java      |   2 +-
 13 files changed, 588 insertions(+), 115 deletions(-)

diff --git a/integrations/apache-calcite/pom.xml b/integrations/apache-calcite/pom.xml
index 6688586..5038722 100644
--- a/integrations/apache-calcite/pom.xml
+++ b/integrations/apache-calcite/pom.xml
@@ -1,4 +1,23 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -18,6 +37,15 @@
       <version>1.18.0-SNAPSHOT</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-linq4j</artifactId>
+      <version>1.18.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.plc4x</groupId>
       <artifactId>plc4j-scraper</artifactId>
       <version>0.3.0-SNAPSHOT</version>
@@ -27,9 +55,42 @@
       <groupId>org.apache.plc4x</groupId>
       <artifactId>plc4j-driver-simulated</artifactId>
       <version>0.3.0-SNAPSHOT</version>
-      <scope>test</scope>
+      <!--<scope>test</scope>-->
     </dependency>
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>3.0.0</version>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+        </configuration>
+        <executions>
+          <execution>
+            <id>assemble-all</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <configuration>
+          <usedDependencies>
+            <usedDependency>org.apache.plc4x:plc4j-driver-simulated</usedDependency>
+          </usedDependencies>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
 
 </project>
\ No newline at end of file
diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xBaseTable.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xBaseTable.java
new file mode 100644
index 0000000..52b306b
--- /dev/null
+++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xBaseTable.java
@@ -0,0 +1,193 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.rel.*;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.plc4x.java.scraper.config.JobConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class Plc4xBaseTable extends AbstractTable {
+
+    private static final Logger logger = LoggerFactory.getLogger(Plc4xBaseTable.class);
+
+    private final BlockingQueue<Plc4xSchema.Record> queue;
+    private final JobConfiguration conf;
+    private final long tableCutoff;
+    private Plc4xSchema.Record current;
+    private List<String> names;
+
+    public Plc4xBaseTable(BlockingQueue<Plc4xSchema.Record> queue, JobConfiguration conf, long tableCutoff) {
+        this.tableCutoff = tableCutoff;
+        logger.info("Instantiating new PLC4X Table with configuration: {}", conf);
+        this.queue = queue;
+        this.conf = conf;
+        // Extract names
+        names = new ArrayList<>(conf.getFields().keySet());
+    }
+
+    @Override
+    public Statistic getStatistic() {
+        return new Statistic() {
+
+            public Double getRowCount() {
+                return tableCutoff > 0 ? (double)tableCutoff : null;
+            }
+
+            public boolean isKey(ImmutableBitSet columns) {
+                return false;
+            }
+
+            public List<RelReferentialConstraint> getReferentialConstraints() {
+                return Collections.emptyList();
+            }
+
+            public List<RelCollation> getCollations() {
+                return Collections.singletonList(RelCollationImpl.of(new RelFieldCollation(0, RelFieldCollation.Direction.ASCENDING)));
+            }
+
+            public RelDistribution getDistribution() {
+                return RelDistributionTraitDef.INSTANCE.getDefault();
+            }
+        };
+    }
+
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+        // Create the table spec
+        // Block until the first result is in the queue
+        CompletableFuture<Plc4xSchema.Record> future = CompletableFuture.supplyAsync(new FirstElementFetcher(queue));
+        Plc4xSchema.Record first;
+        try {
+            first = future.get(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Thread was interrupted!", e);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new RuntimeException("Unable to fetch first record and infer arguments!");
+        }
+        logger.info("Inferring types for Table '{}' based on values: {}", conf.getName(), first.values);
+        // Extract types
+        List<RelDataType> types = names.stream()
+            .map(n -> {
+                Object o = first.values.get(n);
+                logger.debug("Infer field '{}' as class '{}'", n, o.getClass());
+                return typeFactory.createJavaType(o.getClass());
+            })
+            .collect(Collectors.toList());
+        List<String> pre = new ArrayList<>(Arrays.asList("timestamp", "source"));
+        pre.addAll(names);
+        List<RelDataType> preTypes = Stream.of(Timestamp.class, String.class)
+            .map(typeFactory::createJavaType)
+            .collect(Collectors.toList());
+        preTypes.addAll(types);
+        return typeFactory.createStructType(preTypes, pre);
+    }
+
+    /**
+     * if tableCutoff is positive, then the row gets limited to that.
+     */
+    public Enumerable<Object[]> scanInternal(DataContext root) {
+        return new AbstractEnumerable<Object[]>() {
+            @Override
+            public Enumerator<Object[]> enumerator() {
+                return new Enumerator<Object[]>() {
+
+                    private final AtomicLong counter = new AtomicLong(0);
+
+                    @Override
+                    public Object[] current() {
+                        List<Object> objects = new ArrayList<>(Arrays.asList(new Timestamp(current.timestamp.toEpochMilli()), current.source));
+                        List<Object> objects2 = names.stream().map(name -> current.values.get(name)).collect(Collectors.toList());
+                        objects.addAll(objects2);
+                        return objects.toArray();
+                    }
+
+                    @Override
+                    public boolean moveNext() {
+                        try {
+                            current = queue.take();
+                            // If stream, simply return
+                            if (tableCutoff <= 0L) {
+                                return true;
+                            }
+                            // If table, return if below cutoff
+                            return counter.getAndIncrement() < tableCutoff;
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        }
+                        return false;
+                    }
+
+                    @Override
+                    public void reset() {
+                        counter.set(0);
+                    }
+
+                    @Override
+                    public void close() {
+                        // Unimplemented
+                    }
+                };
+            }
+        };
+    }
+
+    /**
+     * Waits until a first (non null) element is in the queue
+     */
+    private static class FirstElementFetcher implements Supplier<Plc4xSchema.Record> {
+
+        private final BlockingQueue<Plc4xSchema.Record> queue;
+
+        private FirstElementFetcher(BlockingQueue<Plc4xSchema.Record> queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public Plc4xSchema.Record get() {
+            Plc4xSchema.Record first;
+            do {
+                first = queue.peek();
+            } while (first == null);
+            return first;
+        }
+    }
+
+}
diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchema.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchema.java
index 51021d6..9e38bfc 100644
--- a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchema.java
+++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchema.java
@@ -1,3 +1,21 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
 package org.apache.plc4x;
 
 import org.apache.calcite.schema.Table;
@@ -7,54 +25,73 @@ import org.apache.plc4x.java.scraper.Scraper;
 import org.apache.plc4x.java.scraper.config.JobConfiguration;
 import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
 
+import java.time.Instant;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.stream.Collectors;
 
-/**
- * Scraper -> Handler -> Table
- */
 public class Plc4xSchema extends AbstractSchema {
 
-    private final ScraperConfiguration configuration;
-    private final Scraper scraper;
-    private final QueueHandler handler;
-    private final Map<String, BlockingQueue<Object[]>> queues;
-    private final Map<String, Table> tableMap;
+    protected final ScraperConfiguration configuration;
+    protected final Scraper scraper;
+    protected final QueueHandler handler;
+    protected final Map<String, BlockingQueue<Record>> queues;
+    protected final Map<String, Table> tableMap;
 
-    Plc4xSchema(ScraperConfiguration configuration) {
+    public Plc4xSchema(ScraperConfiguration configuration, long tableCutoff) {
         this.configuration = configuration;
         this.handler = new QueueHandler();
         this.scraper = new Scraper(configuration, handler);
         this.queues = configuration.getJobConfigurations().stream()
             .collect(Collectors.toMap(
                 JobConfiguration::getName,
-                conf -> new ArrayBlockingQueue<Object[]>(100)
+                conf -> new ArrayBlockingQueue<Record>(1000)
             ));
         // Create the tables
         this.tableMap = configuration.getJobConfigurations().stream()
             .collect(Collectors.toMap(
                 JobConfiguration::getName,
-                conf -> new Plc4xTable(queues.get(conf.getName()), conf)
+                conf -> defineTable(queues.get(conf.getName()), conf, tableCutoff)
             ));
         // Start the scraper
         this.scraper.start();
     }
 
+    Table defineTable(BlockingQueue<Record> queue, JobConfiguration configuration, Long limit) {
+        if (limit <= 0) {
+            return new Plc4xStreamTable(queue, configuration);
+        } else {
+            return new Plc4xTable(queue, configuration, limit);
+        }
+    }
+
     @Override
     protected Map<String, Table> getTableMap() {
         // Return a map of all jobs
         return this.tableMap;
     }
 
+    public static class Record {
+
+        public final Instant timestamp;
+        public final String source;
+        public final Map<String, Object> values;
+
+        public Record(Instant timestamp, String source, Map<String, Object> values) {
+            this.timestamp = timestamp;
+            this.source = source;
+            this.values = values;
+        }
+    }
+
     class QueueHandler implements ResultHandler {
 
         @Override
         public void handle(String job, String alias, Map<String, Object> results) {
-            Object[] objects = results.values().toArray();
             try {
-                queues.get(job).put(objects);
+                Record record = new Record(Instant.now(), alias, results);
+                queues.get(job).put(record);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchemaFactory.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchemaFactory.java
index 368f9fa..6b1d16f 100644
--- a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchemaFactory.java
+++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchemaFactory.java
@@ -1,3 +1,21 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
 package org.apache.plc4x;
 
 import org.apache.calcite.schema.Schema;
@@ -13,8 +31,9 @@ public class Plc4xSchemaFactory implements SchemaFactory {
 
     @Override
     public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
+        // Fetch config
         Object config = operand.get("config");
-        Validate.notNull(config, "No configuration file given. Please specify one with 'config=...'");
+        Validate.notNull(config, "No configuration file given. Please specify operand 'config'...'");
         // Load configuration from file
         ScraperConfiguration configuration;
         try {
@@ -22,8 +41,18 @@ public class Plc4xSchemaFactory implements SchemaFactory {
         } catch (IOException e) {
             throw new RuntimeException("Unable to load configuration file!", e);
         }
+
+        // Fetch limit
+        Object limit = operand.get("limit");
+        Validate.notNull(limit, "No limit for the number of rows for a table. Please specify operand 'config'...'");
+        long parsedLimit;
+        try {
+            parsedLimit = Long.parseLong(limit.toString());
+        } catch (NumberFormatException e) {
+            throw new RuntimeException("Given limit '" + limit + "' cannot be parsed to valid long!", e);
+        }
         // Pass the configuration to the Schema
-        return new Plc4xSchema(configuration);
+        return new Plc4xSchema(configuration, parsedLimit);
     }
 
 }
diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xStreamTable.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xStreamTable.java
new file mode 100644
index 0000000..7725a1d
--- /dev/null
+++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xStreamTable.java
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.plc4x.java.scraper.config.JobConfiguration;
+
+import java.util.concurrent.BlockingQueue;
+
+public class Plc4xStreamTable extends Plc4xBaseTable implements ScannableTable, StreamableTable {
+
+    public Plc4xStreamTable(BlockingQueue<Plc4xSchema.Record> queue, JobConfiguration conf) {
+        super(queue, conf, -1L);
+    }
+
+    @Override
+    public Enumerable<Object[]> scan(DataContext root) {
+        return super.scanInternal(root);
+    }
+
+    @Override
+    public Table stream() {
+        return this;
+    }
+}
diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xTable.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xTable.java
index ab19bd5..cd7c142 100644
--- a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xTable.java
+++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xTable.java
@@ -1,83 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
 package org.apache.plc4x;
 
 import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.AbstractEnumerable;
 import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.ScannableTable;
-import org.apache.calcite.schema.StreamableTable;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.AbstractTable;
 import org.apache.plc4x.java.scraper.config.JobConfiguration;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 
-public class Plc4xTable extends AbstractTable implements StreamableTable, ScannableTable {
+public class Plc4xTable extends Plc4xBaseTable implements ScannableTable {
 
-    private final BlockingQueue<Object[]> queue;
-    private final JobConfiguration conf;
-    private Object[] current;
-
-    public Plc4xTable(BlockingQueue<Object[]> queue, JobConfiguration conf) {
-        this.queue = queue;
-        this.conf = conf;
-    }
-
-    @Override
-    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-        // Create the table spec
-        List<String> names = new ArrayList<>();
-        List<RelDataType> types = new ArrayList<>();
-        for (Map.Entry<String, String> entry : conf.getFields().entrySet()) {
-            names.add(entry.getKey());
-            types.add(typeFactory.createJavaType(String.class));
-        }
-        return typeFactory.createStructType(types, names);
+    public Plc4xTable(BlockingQueue<Plc4xSchema.Record> queue, JobConfiguration conf, long tableCutoff) {
+        super(queue, conf, tableCutoff);
     }
 
     @Override
     public Enumerable<Object[]> scan(DataContext root) {
-        return new AbstractEnumerable<Object[]>() {
-            @Override
-            public Enumerator<Object[]> enumerator() {
-                return new Enumerator<Object[]>() {
-                    @Override
-                    public Object[] current() {
-                        return current;
-                    }
-
-                    @Override
-                    public boolean moveNext() {
-                        try {
-                            current = queue.take();
-                            return true;
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                        }
-                        return false;
-                    }
-
-                    @Override
-                    public void reset() {
-                        // Unimplemented
-                    }
-
-                    @Override
-                    public void close() {
-                        // Unimplemented
-                    }
-                };
-            }
-        };
+        return super.scanInternal(root);
     }
 
-    @Override
-    public Table stream() {
-        return this;
-    }
 }
diff --git a/integrations/apache-calcite/src/test/java/org/apache/plc4x/DriverManagerTest.java b/integrations/apache-calcite/src/test/java/org/apache/plc4x/DriverManagerTest.java
index 9bdf9a3..5a6bc7c 100644
--- a/integrations/apache-calcite/src/test/java/org/apache/plc4x/DriverManagerTest.java
+++ b/integrations/apache-calcite/src/test/java/org/apache/plc4x/DriverManagerTest.java
@@ -1,8 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
 package org.apache.plc4x;
 
 import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.calcite.jdbc.Driver;
-import org.apache.calcite.schema.Schema;
 import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
 import org.junit.jupiter.api.Test;
 
@@ -10,7 +27,6 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Collections;
 import java.util.Properties;
 
 public class DriverManagerTest {
@@ -18,18 +34,57 @@ public class DriverManagerTest {
     @Test
     void instanciateJdbcConnection() throws SQLException, IOException {
         Driver driver = new Driver();
-        Connection connection = driver.connect("jdbc:calcite://asdf;config=abc", new Properties());
+        Connection connection = driver.connect("jdbc:calcite://asdf;config=abc;lex=MYSQL_ANSI", new Properties());
 
         CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-        calciteConnection.getRootSchema().add("plc4x", new Plc4xSchema(ScraperConfiguration.fromFile("src/test/resources/example.yml")));
+        calciteConnection.getRootSchema().add("plc4x", new Plc4xSchema(ScraperConfiguration.fromFile("src/test/resources/example.yml"), 10));
+
+        // ResultSet rs = connection.prepareStatement("SELECT STREAM \"test\", \"test\" * 2, \"test2\" FROM \"plc4x\".\"job1\"").executeQuery();
+        ResultSet rs = connection.prepareStatement("SELECT STREAM * FROM \"plc4x\".\"job1\" WHERE source = 'test'").executeQuery();
+
+        // Print the header
+        int count = rs.getMetaData().getColumnCount();
+        for (int i = 1; i <= count; i++) {
+            System.out.print(rs.getMetaData().getColumnLabel(i) + "(" + rs.getMetaData().getColumnTypeName(i) + ")" + "\t");
+        }
+        System.out.println("");
+
+        while (rs.next()) {
+            for (int i = 1; i <= count; i++) {
+                System.out.print(rs.getString(i) + "\t");
+            }
+            System.out.println("");
+        }
+
+        connection.close();
+    }
+
+    @Test
+    void instantiateDirect() throws IOException, SQLException {
+        Driver driver = new Driver();
+        Connection connection = driver.connect("jdbc:calcite:model=src/test/resources/model.yml;lex=MYSQL_ANSI", new Properties());
 
-        ResultSet rs = connection.prepareStatement("SELECT STREAM * FROM \"plc4x\".\"job1\"").executeQuery();
+        // ResultSet rs = connection.prepareStatement("SELECT STREAM \"test\", \"test\" * 2, \"test2\" FROM \"plc4x\".\"job1\"").executeQuery();
+        ResultSet rs = connection.prepareStatement("SELECT * FROM \"plc4x-tables\".\"job1\"").executeQuery();
+
+        // Print the header
+        int count = rs.getMetaData().getColumnCount();
+        for (int i = 1; i <= count; i++) {
+            System.out.print(rs.getMetaData().getColumnLabel(i) + "(" + rs.getMetaData().getColumnTypeName(i) + ")" + "\t");
+        }
+        System.out.println("");
+
+        int row = 1;
 
         while (rs.next()) {
-            System.out.print("Spalte 1: " + rs.getString(1) + "\t");
-            System.out.println("Spalte 2: " + rs.getString(2));
+            System.out.print(row++ + "\t");
+            for (int i = 1; i <= count; i++) {
+                System.out.print(rs.getString(i) + "\t");
+            }
+            System.out.println("");
         }
 
         connection.close();
     }
+
 }
diff --git a/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java b/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java
new file mode 100644
index 0000000..c89d799
--- /dev/null
+++ b/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.plc4x.java.scraper.config.JobConfiguration;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+class Plc4XBaseTableTest implements WithAssertions {
+
+    @Test
+    void testOnBlockingQueue() {
+        ArrayBlockingQueue<Plc4xSchema.Record> queue = new ArrayBlockingQueue<>(100);
+        Plc4xStreamTable table = new Plc4xStreamTable(queue, new JobConfiguration("job1", 100,
+            Collections.emptyList(),
+            Collections.singletonMap("key", "address")));
+
+        Map<String, Object> objects = Collections.singletonMap("key", "value");
+        queue.add(new Plc4xSchema.Record(Instant.now(), "", objects));
+
+        Enumerator<Object[]> enumerator = table.scan(null).enumerator();
+
+        assertThat(enumerator.moveNext()).isTrue();
+        assertThat(enumerator.current()).containsExactly("value");
+    }
+}
\ No newline at end of file
diff --git a/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4xTableTest.java b/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4xTableTest.java
deleted file mode 100644
index 6d4fb17..0000000
--- a/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4xTableTest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.plc4x;
-
-import org.apache.calcite.linq4j.Enumerator;
-import org.assertj.core.api.WithAssertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.concurrent.ArrayBlockingQueue;
-
-class Plc4xTableTest implements WithAssertions {
-
-    @Test
-    void testOnBlockingQueue() {
-        ArrayBlockingQueue<Object[]> queue = new ArrayBlockingQueue<Object[]>(100);
-        Plc4xTable table = new Plc4xTable(queue, null);
-
-        Object[] objects = new Object[0];
-        queue.add(objects);
-
-        Enumerator<Object[]> enumerator = table.scan(null).enumerator();
-
-        assertThat(enumerator.moveNext()).isTrue();
-        assertThat(enumerator.current()).isEqualTo(objects);
-    }
-}
\ No newline at end of file
diff --git a/integrations/apache-calcite/src/test/resources/example.yml b/integrations/apache-calcite/src/test/resources/example.yml
index 93b58e2..7294ab0 100644
--- a/integrations/apache-calcite/src/test/resources/example.yml
+++ b/integrations/apache-calcite/src/test/resources/example.yml
@@ -19,12 +19,14 @@
 ---
 sources:
   test: test:test
+  test2: test:test2
 
 jobs:
   - name: job1
-    scrapeRate: 1000
+    scrapeRate: 10
     sources:
     - test
+    - test2
     fields:
       test: 'RANDOM/test:Integer'
       test2: 'RANDOM/test:String'
\ No newline at end of file
diff --git a/integrations/apache-calcite/src/test/resources/logback.xml b/integrations/apache-calcite/src/test/resources/logback.xml
new file mode 100644
index 0000000..dd243bd
--- /dev/null
+++ b/integrations/apache-calcite/src/test/resources/logback.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+<configuration xmlns="http://ch.qos.logback/xml/ns/logback"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="
+                  http://ch.qos.logback/xml/ns/logback
+                  https://raw.githubusercontent.com/enricopulatzo/logback-XSD/master/src/main/xsd/logback.xsd">
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <!-- encoders are assigned the type
+         ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="info">
+    <appender-ref ref="STDOUT" />
+  </root>
+
+</configuration>
\ No newline at end of file
diff --git a/integrations/apache-calcite/src/test/resources/model.yml b/integrations/apache-calcite/src/test/resources/model.yml
new file mode 100644
index 0000000..07b7479
--- /dev/null
+++ b/integrations/apache-calcite/src/test/resources/model.yml
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# A JSON model of a simple Calcite schema.
+#
+version: 1.0
+defaultSchema: PLC4X
+schemas:
+- name: PLC4X
+  type: custom
+  factory: org.apache.plc4x.Plc4xSchemaFactory
+  operand:
+    config:  /Users/julian/Develop/incubator-plc4x/integrations/apache-calcite/src/test/resources/example.yml
+    limit: -1
+- name: PLC4X-TABLES
+  type: custom
+  factory: org.apache.plc4x.Plc4xSchemaFactory
+  operand:
+    config:  /Users/julian/Develop/incubator-plc4x/integrations/apache-calcite/src/test/resources/example.yml
+    limit: 100
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
index 2fd50f7..5549432 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
@@ -43,7 +43,7 @@ public class JobConfiguration {
      * @param fields Map from field alias (how it is named in the result map) to plc4x field query
      */
     @JsonCreator
-    JobConfiguration(@JsonProperty(value = "name", required = true) String name,
+    public JobConfiguration(@JsonProperty(value = "name", required = true) String name,
                             @JsonProperty(value = "scrapeRate", required = true) int scrapeRate,
                             @JsonProperty(value = "sources", required = true) List<String> sources,
                             @JsonProperty(value = "fields", required = true) Map<String, String> fields) {