You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2018/12/25 16:09:33 UTC
[incubator-plc4x] 02/03: [CALCITE] Running Version.
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch feature/calcite-adapter
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit df1a6dbfb8cd456ea424152f39cbf022325274c8
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sun Dec 23 21:33:47 2018 +0100
[CALCITE] Running Version.
---
integrations/apache-calcite/pom.xml | 63 ++++++-
.../main/java/org/apache/plc4x/Plc4xBaseTable.java | 193 +++++++++++++++++++++
.../main/java/org/apache/plc4x/Plc4xSchema.java | 63 +++++--
.../java/org/apache/plc4x/Plc4xSchemaFactory.java | 33 +++-
.../java/org/apache/plc4x/Plc4xStreamTable.java | 45 +++++
.../src/main/java/org/apache/plc4x/Plc4xTable.java | 88 +++-------
.../java/org/apache/plc4x/DriverManagerTest.java | 69 +++++++-
.../java/org/apache/plc4x/Plc4XBaseTableTest.java | 48 +++++
.../test/java/org/apache/plc4x/Plc4xTableTest.java | 24 ---
.../apache-calcite/src/test/resources/example.yml | 4 +-
.../apache-calcite/src/test/resources/logback.xml | 38 ++++
.../apache-calcite/src/test/resources/model.yml | 33 ++++
.../java/scraper/config/JobConfiguration.java | 2 +-
13 files changed, 588 insertions(+), 115 deletions(-)
diff --git a/integrations/apache-calcite/pom.xml b/integrations/apache-calcite/pom.xml
index 6688586..5038722 100644
--- a/integrations/apache-calcite/pom.xml
+++ b/integrations/apache-calcite/pom.xml
@@ -1,4 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -18,6 +37,15 @@
<version>1.18.0-SNAPSHOT</version>
</dependency>
<dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-linq4j</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-scraper</artifactId>
<version>0.3.0-SNAPSHOT</version>
@@ -27,9 +55,42 @@
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-driver-simulated</artifactId>
<version>0.3.0-SNAPSHOT</version>
- <scope>test</scope>
+ <!--<scope>test</scope>-->
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>assemble-all</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <usedDependencies>
+ <usedDependency>org.apache.plc4x:plc4j-driver-simulated</usedDependency>
+ </usedDependencies>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
\ No newline at end of file
diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xBaseTable.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xBaseTable.java
new file mode 100644
index 0000000..52b306b
--- /dev/null
+++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xBaseTable.java
@@ -0,0 +1,193 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.rel.*;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.plc4x.java.scraper.config.JobConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class Plc4xBaseTable extends AbstractTable {
+
+ private static final Logger logger = LoggerFactory.getLogger(Plc4xBaseTable.class);
+
+ private final BlockingQueue<Plc4xSchema.Record> queue;
+ private final JobConfiguration conf;
+ private final long tableCutoff;
+ private Plc4xSchema.Record current;
+ private List<String> names;
+
+ public Plc4xBaseTable(BlockingQueue<Plc4xSchema.Record> queue, JobConfiguration conf, long tableCutoff) {
+ this.tableCutoff = tableCutoff;
+ logger.info("Instantiating new PLC4X Table with configuration: {}", conf);
+ this.queue = queue;
+ this.conf = conf;
+ // Extract names
+ names = new ArrayList<>(conf.getFields().keySet());
+ }
+
+ @Override
+ public Statistic getStatistic() {
+ return new Statistic() {
+
+ public Double getRowCount() {
+ return tableCutoff > 0 ? (double)tableCutoff : null;
+ }
+
+ public boolean isKey(ImmutableBitSet columns) {
+ return false;
+ }
+
+ public List<RelReferentialConstraint> getReferentialConstraints() {
+ return Collections.emptyList();
+ }
+
+ public List<RelCollation> getCollations() {
+ return Collections.singletonList(RelCollationImpl.of(new RelFieldCollation(0, RelFieldCollation.Direction.ASCENDING)));
+ }
+
+ public RelDistribution getDistribution() {
+ return RelDistributionTraitDef.INSTANCE.getDefault();
+ }
+ };
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ // Create the table spec
+ // Block until the first result is in the queue
+ CompletableFuture<Plc4xSchema.Record> future = CompletableFuture.supplyAsync(new FirstElementFetcher(queue));
+ Plc4xSchema.Record first;
+ try {
+ first = future.get(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Thread was interrupted!", e);
+ } catch (ExecutionException | TimeoutException e) {
+ throw new RuntimeException("Unable to fetch first record and infer arguments!");
+ }
+ logger.info("Inferring types for Table '{}' based on values: {}", conf.getName(), first.values);
+ // Extract types
+ List<RelDataType> types = names.stream()
+ .map(n -> {
+ Object o = first.values.get(n);
+ logger.debug("Infer field '{}' as class '{}'", n, o.getClass());
+ return typeFactory.createJavaType(o.getClass());
+ })
+ .collect(Collectors.toList());
+ List<String> pre = new ArrayList<>(Arrays.asList("timestamp", "source"));
+ pre.addAll(names);
+ List<RelDataType> preTypes = Stream.of(Timestamp.class, String.class)
+ .map(typeFactory::createJavaType)
+ .collect(Collectors.toList());
+ preTypes.addAll(types);
+ return typeFactory.createStructType(preTypes, pre);
+ }
+
+ /**
+ * if tableCutoff is positive, then the row gets limited to that.
+ */
+ public Enumerable<Object[]> scanInternal(DataContext root) {
+ return new AbstractEnumerable<Object[]>() {
+ @Override
+ public Enumerator<Object[]> enumerator() {
+ return new Enumerator<Object[]>() {
+
+ private final AtomicLong counter = new AtomicLong(0);
+
+ @Override
+ public Object[] current() {
+ List<Object> objects = new ArrayList<>(Arrays.asList(new Timestamp(current.timestamp.toEpochMilli()), current.source));
+ List<Object> objects2 = names.stream().map(name -> current.values.get(name)).collect(Collectors.toList());
+ objects.addAll(objects2);
+ return objects.toArray();
+ }
+
+ @Override
+ public boolean moveNext() {
+ try {
+ current = queue.take();
+ // If stream, simply return
+ if (tableCutoff <= 0L) {
+ return true;
+ }
+ // If table, return if below cutoff
+ return counter.getAndIncrement() < tableCutoff;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ counter.set(0);
+ }
+
+ @Override
+ public void close() {
+ // Unimplemented
+ }
+ };
+ }
+ };
+ }
+
+ /**
+ * Waits until a first (non null) element is in the queue
+ */
+ private static class FirstElementFetcher implements Supplier<Plc4xSchema.Record> {
+
+ private final BlockingQueue<Plc4xSchema.Record> queue;
+
+ private FirstElementFetcher(BlockingQueue<Plc4xSchema.Record> queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public Plc4xSchema.Record get() {
+ Plc4xSchema.Record first;
+ do {
+ first = queue.peek();
+ } while (first == null);
+ return first;
+ }
+ }
+
+}
diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchema.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchema.java
index 51021d6..9e38bfc 100644
--- a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchema.java
+++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchema.java
@@ -1,3 +1,21 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
package org.apache.plc4x;
import org.apache.calcite.schema.Table;
@@ -7,54 +25,73 @@ import org.apache.plc4x.java.scraper.Scraper;
import org.apache.plc4x.java.scraper.config.JobConfiguration;
import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
+import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
-/**
- * Scraper -> Handler -> Table
- */
public class Plc4xSchema extends AbstractSchema {
- private final ScraperConfiguration configuration;
- private final Scraper scraper;
- private final QueueHandler handler;
- private final Map<String, BlockingQueue<Object[]>> queues;
- private final Map<String, Table> tableMap;
+ protected final ScraperConfiguration configuration;
+ protected final Scraper scraper;
+ protected final QueueHandler handler;
+ protected final Map<String, BlockingQueue<Record>> queues;
+ protected final Map<String, Table> tableMap;
- Plc4xSchema(ScraperConfiguration configuration) {
+ public Plc4xSchema(ScraperConfiguration configuration, long tableCutoff) {
this.configuration = configuration;
this.handler = new QueueHandler();
this.scraper = new Scraper(configuration, handler);
this.queues = configuration.getJobConfigurations().stream()
.collect(Collectors.toMap(
JobConfiguration::getName,
- conf -> new ArrayBlockingQueue<Object[]>(100)
+ conf -> new ArrayBlockingQueue<Record>(1000)
));
// Create the tables
this.tableMap = configuration.getJobConfigurations().stream()
.collect(Collectors.toMap(
JobConfiguration::getName,
- conf -> new Plc4xTable(queues.get(conf.getName()), conf)
+ conf -> defineTable(queues.get(conf.getName()), conf, tableCutoff)
));
// Start the scraper
this.scraper.start();
}
+ Table defineTable(BlockingQueue<Record> queue, JobConfiguration configuration, Long limit) {
+ if (limit <= 0) {
+ return new Plc4xStreamTable(queue, configuration);
+ } else {
+ return new Plc4xTable(queue, configuration, limit);
+ }
+ }
+
@Override
protected Map<String, Table> getTableMap() {
// Return a map of all jobs
return this.tableMap;
}
+ public static class Record {
+
+ public final Instant timestamp;
+ public final String source;
+ public final Map<String, Object> values;
+
+ public Record(Instant timestamp, String source, Map<String, Object> values) {
+ this.timestamp = timestamp;
+ this.source = source;
+ this.values = values;
+ }
+ }
+
class QueueHandler implements ResultHandler {
@Override
public void handle(String job, String alias, Map<String, Object> results) {
- Object[] objects = results.values().toArray();
try {
- queues.get(job).put(objects);
+ Record record = new Record(Instant.now(), alias, results);
+ queues.get(job).put(record);
} catch (InterruptedException e) {
e.printStackTrace();
}
diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchemaFactory.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchemaFactory.java
index 368f9fa..6b1d16f 100644
--- a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchemaFactory.java
+++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchemaFactory.java
@@ -1,3 +1,21 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
package org.apache.plc4x;
import org.apache.calcite.schema.Schema;
@@ -13,8 +31,9 @@ public class Plc4xSchemaFactory implements SchemaFactory {
@Override
public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
+ // Fetch config
Object config = operand.get("config");
- Validate.notNull(config, "No configuration file given. Please specify one with 'config=...'");
+ Validate.notNull(config, "No configuration file given. Please specify operand 'config'...'");
// Load configuration from file
ScraperConfiguration configuration;
try {
@@ -22,8 +41,18 @@ public class Plc4xSchemaFactory implements SchemaFactory {
} catch (IOException e) {
throw new RuntimeException("Unable to load configuration file!", e);
}
+
+ // Fetch limit
+ Object limit = operand.get("limit");
+ Validate.notNull(limit, "No limit for the number of rows for a table. Please specify operand 'config'...'");
+ long parsedLimit;
+ try {
+ parsedLimit = Long.parseLong(limit.toString());
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("Given limit '" + limit + "' cannot be parsed to valid long!", e);
+ }
// Pass the configuration to the Schema
- return new Plc4xSchema(configuration);
+ return new Plc4xSchema(configuration, parsedLimit);
}
}
diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xStreamTable.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xStreamTable.java
new file mode 100644
index 0000000..7725a1d
--- /dev/null
+++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xStreamTable.java
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.plc4x.java.scraper.config.JobConfiguration;
+
+import java.util.concurrent.BlockingQueue;
+
+public class Plc4xStreamTable extends Plc4xBaseTable implements ScannableTable, StreamableTable {
+
+ public Plc4xStreamTable(BlockingQueue<Plc4xSchema.Record> queue, JobConfiguration conf) {
+ super(queue, conf, -1L);
+ }
+
+ @Override
+ public Enumerable<Object[]> scan(DataContext root) {
+ return super.scanInternal(root);
+ }
+
+ @Override
+ public Table stream() {
+ return this;
+ }
+}
diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xTable.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xTable.java
index ab19bd5..cd7c142 100644
--- a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xTable.java
+++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xTable.java
@@ -1,83 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
package org.apache.plc4x;
import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
-import org.apache.calcite.schema.StreamableTable;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.plc4x.java.scraper.config.JobConfiguration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
-public class Plc4xTable extends AbstractTable implements StreamableTable, ScannableTable {
+public class Plc4xTable extends Plc4xBaseTable implements ScannableTable {
- private final BlockingQueue<Object[]> queue;
- private final JobConfiguration conf;
- private Object[] current;
-
- public Plc4xTable(BlockingQueue<Object[]> queue, JobConfiguration conf) {
- this.queue = queue;
- this.conf = conf;
- }
-
- @Override
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- // Create the table spec
- List<String> names = new ArrayList<>();
- List<RelDataType> types = new ArrayList<>();
- for (Map.Entry<String, String> entry : conf.getFields().entrySet()) {
- names.add(entry.getKey());
- types.add(typeFactory.createJavaType(String.class));
- }
- return typeFactory.createStructType(types, names);
+ public Plc4xTable(BlockingQueue<Plc4xSchema.Record> queue, JobConfiguration conf, long tableCutoff) {
+ super(queue, conf, tableCutoff);
}
@Override
public Enumerable<Object[]> scan(DataContext root) {
- return new AbstractEnumerable<Object[]>() {
- @Override
- public Enumerator<Object[]> enumerator() {
- return new Enumerator<Object[]>() {
- @Override
- public Object[] current() {
- return current;
- }
-
- @Override
- public boolean moveNext() {
- try {
- current = queue.take();
- return true;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return false;
- }
-
- @Override
- public void reset() {
- // Unimplemented
- }
-
- @Override
- public void close() {
- // Unimplemented
- }
- };
- }
- };
+ return super.scanInternal(root);
}
- @Override
- public Table stream() {
- return this;
- }
}
diff --git a/integrations/apache-calcite/src/test/java/org/apache/plc4x/DriverManagerTest.java b/integrations/apache-calcite/src/test/java/org/apache/plc4x/DriverManagerTest.java
index 9bdf9a3..5a6bc7c 100644
--- a/integrations/apache-calcite/src/test/java/org/apache/plc4x/DriverManagerTest.java
+++ b/integrations/apache-calcite/src/test/java/org/apache/plc4x/DriverManagerTest.java
@@ -1,8 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
package org.apache.plc4x;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.Driver;
-import org.apache.calcite.schema.Schema;
import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
import org.junit.jupiter.api.Test;
@@ -10,7 +27,6 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Collections;
import java.util.Properties;
public class DriverManagerTest {
@@ -18,18 +34,57 @@ public class DriverManagerTest {
@Test
void instanciateJdbcConnection() throws SQLException, IOException {
Driver driver = new Driver();
- Connection connection = driver.connect("jdbc:calcite://asdf;config=abc", new Properties());
+ Connection connection = driver.connect("jdbc:calcite://asdf;config=abc;lex=MYSQL_ANSI", new Properties());
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
- calciteConnection.getRootSchema().add("plc4x", new Plc4xSchema(ScraperConfiguration.fromFile("src/test/resources/example.yml")));
+ calciteConnection.getRootSchema().add("plc4x", new Plc4xSchema(ScraperConfiguration.fromFile("src/test/resources/example.yml"), 10));
+
+ // ResultSet rs = connection.prepareStatement("SELECT STREAM \"test\", \"test\" * 2, \"test2\" FROM \"plc4x\".\"job1\"").executeQuery();
+ ResultSet rs = connection.prepareStatement("SELECT STREAM * FROM \"plc4x\".\"job1\" WHERE source = 'test'").executeQuery();
+
+ // Print the header
+ int count = rs.getMetaData().getColumnCount();
+ for (int i = 1; i <= count; i++) {
+ System.out.print(rs.getMetaData().getColumnLabel(i) + "(" + rs.getMetaData().getColumnTypeName(i) + ")" + "\t");
+ }
+ System.out.println("");
+
+ while (rs.next()) {
+ for (int i = 1; i <= count; i++) {
+ System.out.print(rs.getString(i) + "\t");
+ }
+ System.out.println("");
+ }
+
+ connection.close();
+ }
+
+ @Test
+ void instantiateDirect() throws IOException, SQLException {
+ Driver driver = new Driver();
+ Connection connection = driver.connect("jdbc:calcite:model=src/test/resources/model.yml;lex=MYSQL_ANSI", new Properties());
- ResultSet rs = connection.prepareStatement("SELECT STREAM * FROM \"plc4x\".\"job1\"").executeQuery();
+ // ResultSet rs = connection.prepareStatement("SELECT STREAM \"test\", \"test\" * 2, \"test2\" FROM \"plc4x\".\"job1\"").executeQuery();
+ ResultSet rs = connection.prepareStatement("SELECT * FROM \"plc4x-tables\".\"job1\"").executeQuery();
+
+ // Print the header
+ int count = rs.getMetaData().getColumnCount();
+ for (int i = 1; i <= count; i++) {
+ System.out.print(rs.getMetaData().getColumnLabel(i) + "(" + rs.getMetaData().getColumnTypeName(i) + ")" + "\t");
+ }
+ System.out.println("");
+
+ int row = 1;
while (rs.next()) {
- System.out.print("Spalte 1: " + rs.getString(1) + "\t");
- System.out.println("Spalte 2: " + rs.getString(2));
+ System.out.print(row++ + "\t");
+ for (int i = 1; i <= count; i++) {
+ System.out.print(rs.getString(i) + "\t");
+ }
+ System.out.println("");
}
connection.close();
}
+
}
diff --git a/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java b/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java
new file mode 100644
index 0000000..c89d799
--- /dev/null
+++ b/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.plc4x.java.scraper.config.JobConfiguration;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+class Plc4XBaseTableTest implements WithAssertions {
+
+ @Test
+ void testOnBlockingQueue() {
+ ArrayBlockingQueue<Plc4xSchema.Record> queue = new ArrayBlockingQueue<>(100);
+ Plc4xStreamTable table = new Plc4xStreamTable(queue, new JobConfiguration("job1", 100,
+ Collections.emptyList(),
+ Collections.singletonMap("key", "address")));
+
+ Map<String, Object> objects = Collections.singletonMap("key", "value");
+ queue.add(new Plc4xSchema.Record(Instant.now(), "", objects));
+
+ Enumerator<Object[]> enumerator = table.scan(null).enumerator();
+
+ assertThat(enumerator.moveNext()).isTrue();
+ assertThat(enumerator.current()).containsExactly("value");
+ }
+}
\ No newline at end of file
diff --git a/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4xTableTest.java b/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4xTableTest.java
deleted file mode 100644
index 6d4fb17..0000000
--- a/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4xTableTest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.plc4x;
-
-import org.apache.calcite.linq4j.Enumerator;
-import org.assertj.core.api.WithAssertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.concurrent.ArrayBlockingQueue;
-
-class Plc4xTableTest implements WithAssertions {
-
- @Test
- void testOnBlockingQueue() {
- ArrayBlockingQueue<Object[]> queue = new ArrayBlockingQueue<Object[]>(100);
- Plc4xTable table = new Plc4xTable(queue, null);
-
- Object[] objects = new Object[0];
- queue.add(objects);
-
- Enumerator<Object[]> enumerator = table.scan(null).enumerator();
-
- assertThat(enumerator.moveNext()).isTrue();
- assertThat(enumerator.current()).isEqualTo(objects);
- }
-}
\ No newline at end of file
diff --git a/integrations/apache-calcite/src/test/resources/example.yml b/integrations/apache-calcite/src/test/resources/example.yml
index 93b58e2..7294ab0 100644
--- a/integrations/apache-calcite/src/test/resources/example.yml
+++ b/integrations/apache-calcite/src/test/resources/example.yml
@@ -19,12 +19,14 @@
---
sources:
test: test:test
+ test2: test:test2
jobs:
- name: job1
- scrapeRate: 1000
+ scrapeRate: 10
sources:
- test
+ - test2
fields:
test: 'RANDOM/test:Integer'
test2: 'RANDOM/test:String'
\ No newline at end of file
diff --git a/integrations/apache-calcite/src/test/resources/logback.xml b/integrations/apache-calcite/src/test/resources/logback.xml
new file mode 100644
index 0000000..dd243bd
--- /dev/null
+++ b/integrations/apache-calcite/src/test/resources/logback.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<configuration xmlns="http://ch.qos.logback/xml/ns/logback"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://ch.qos.logback/xml/ns/logback
+ https://raw.githubusercontent.com/enricopulatzo/logback-XSD/master/src/main/xsd/logback.xsd">
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="info">
+ <appender-ref ref="STDOUT" />
+ </root>
+
+</configuration>
\ No newline at end of file
diff --git a/integrations/apache-calcite/src/test/resources/model.yml b/integrations/apache-calcite/src/test/resources/model.yml
new file mode 100644
index 0000000..07b7479
--- /dev/null
+++ b/integrations/apache-calcite/src/test/resources/model.yml
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# A JSON model of a simple Calcite schema.
+#
+version: 1.0
+defaultSchema: PLC4X
+schemas:
+- name: PLC4X
+ type: custom
+ factory: org.apache.plc4x.Plc4xSchemaFactory
+ operand:
+ config: /Users/julian/Develop/incubator-plc4x/integrations/apache-calcite/src/test/resources/example.yml
+ limit: -1
+- name: PLC4X-TABLES
+ type: custom
+ factory: org.apache.plc4x.Plc4xSchemaFactory
+ operand:
+ config: /Users/julian/Develop/incubator-plc4x/integrations/apache-calcite/src/test/resources/example.yml
+ limit: 100
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
index 2fd50f7..5549432 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
@@ -43,7 +43,7 @@ public class JobConfiguration {
* @param fields Map from field alias (how it is named in the result map) to plc4x field query
*/
@JsonCreator
- JobConfiguration(@JsonProperty(value = "name", required = true) String name,
+ public JobConfiguration(@JsonProperty(value = "name", required = true) String name,
@JsonProperty(value = "scrapeRate", required = true) int scrapeRate,
@JsonProperty(value = "sources", required = true) List<String> sources,
@JsonProperty(value = "fields", required = true) Map<String, String> fields) {