You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/14 17:08:18 UTC
[plc4x] 03/04: - Removed the kafka connect sink
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch featule/kafka-connect-refactoring
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 328a9431512c2ac0a1799836a58b05da8e78ac09
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Tue Aug 13 16:19:57 2019 +0200
- Removed the kafka connect sink
---
plc4j/integrations/apache-kafka/pom.xml | 4 +-
.../org/apache/plc4x/kafka/Plc4xSinkConnector.java | 76 --------------
.../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 116 ---------------------
.../apache/plc4x/kafka/Plc4xSourceConnector.java | 20 +---
4 files changed, 3 insertions(+), 213 deletions(-)
diff --git a/plc4j/integrations/apache-kafka/pom.xml b/plc4j/integrations/apache-kafka/pom.xml
index 3e38ccb..857f412 100644
--- a/plc4j/integrations/apache-kafka/pom.xml
+++ b/plc4j/integrations/apache-kafka/pom.xml
@@ -41,11 +41,11 @@
<artifactId>plc4j-api</artifactId>
<version>0.5.0-SNAPSHOT</version>
</dependency>
- <!--dependency>
+ <dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-scraper</artifactId>
<version>0.5.0-SNAPSHOT</version>
- </dependency-->
+ </dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
deleted file mode 100644
index 7e0b477..0000000
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka;
-
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.plc4x.kafka.util.VersionUtil;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class Plc4xSinkConnector extends SinkConnector {
-
- private static final String URL_CONFIG = "url";
- private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
- private static final ConfigDef CONFIG_DEF =
- new ConfigDef().define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
-
- private String url;
-
- @Override
- public Class<? extends Task> taskClass() {
- return Plc4xSinkTask.class;
- }
-
- @Override
- public List<Map<String, String>> taskConfigs(int maxTasks) {
- List<Map<String, String>> configs = new LinkedList<>();
- for (int i = 0; i < maxTasks; i++) {
- Map<String, String> taskConfig = new HashMap<>();
- taskConfig.put(URL_CONFIG, url);
- configs.add(taskConfig);
- }
- return configs;
- }
-
- @Override
- public void start(Map<String, String> props) {
- AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
- url = config.getString(URL_CONFIG);
- }
-
- @Override
- public void stop() {}
-
- @Override
- public ConfigDef config() {
- return CONFIG_DEF;
- }
-
- @Override
- public String version() {
- return VersionUtil.getVersion();
- }
-
-}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
deleted file mode 100644
index 855e759..0000000
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka;
-
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
-import org.apache.plc4x.kafka.util.VersionUtil;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-public class Plc4xSinkTask extends SinkTask {
-
- private String url;
-
- private PlcConnection plcConnection;
-
- @Override
- public String version() {
- return VersionUtil.getVersion();
- }
-
- @Override
- public void start(Map<String, String> props) {
- /*AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
- url = config.getString(Plc4xSinkConnector.URL_CONFIG);*/
-
- openConnection();
-
- if (!plcConnection.getMetadata().canWrite()) {
- throw new ConnectException("Writing not supported on this connection");
- }
- }
-
- @Override
- public void stop() {
- closeConnection();
- }
-
- @Override
- public void put(Collection<SinkRecord> records) {
- for (SinkRecord record: records) {
- String query = record.key().toString();
- Object value = record.value();
- PlcWriteRequest.Builder builder = plcConnection.writeRequestBuilder();
- PlcWriteRequest plcRequest = addToBuilder(builder, query, value).build();
- doWrite(plcRequest);
- }
- }
-
- // TODO: fix this
- private PlcWriteRequest.Builder addToBuilder(PlcWriteRequest.Builder builder, String query, Object obj) {
- Class<?> type = obj.getClass();
-
- if (type.equals(Integer.class)) {
- int value = (int) obj;
- builder.addItem(query, query, value);
- } else if (type.equals(String.class)) {
- String value = (String) obj;
- builder.addItem(query, query, value);
- }
-
- return builder;
- }
-
- private void openConnection() {
- try {
- plcConnection = new PlcDriverManager().getConnection(url);
- plcConnection.connect();
- } catch (PlcConnectionException e) {
- throw new ConnectException("Could not establish a PLC connection", e);
- }
- }
-
- private void closeConnection() {
- if (plcConnection != null) {
- try {
- plcConnection.close();
- } catch (Exception e) {
- throw new PlcRuntimeException("Caught exception while closing connection to PLC", e);
- }
- }
- }
-
- private void doWrite(PlcWriteRequest request) {
- try {
- request.execute().get();
- } catch (ExecutionException | InterruptedException e) {
- throw new ConnectException("Caught exception during write", e);
- }
- }
-
-}
\ No newline at end of file
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 200dff5..ffb6b35 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -33,25 +33,7 @@ public class Plc4xSourceConnector extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(Plc4xSourceConnector.class);
- private static final String TOPIC_CONFIG = "topic";
- private static final String TOPIC_DOC = "Kafka topic to publish to";
-
- private static final String QUERIES_CONFIG = "queries";
- private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
-
- private static final String JSON_CONFIG = "json.url";
- private static final String JSON_DEFAULT = "";
- private static final String JSON_DOC = "JSON configuration";
-
- private static final String RATE_CONFIG = "rate";
- private static final Integer RATE_DEFAULT = 1000;
- private static final String RATE_DOC = "Polling rate";
-
- private static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
- .define(QUERIES_CONFIG, ConfigDef.Type.LIST, new LinkedList<>(), ConfigDef.Importance.HIGH, QUERIES_DOC)
- .define(JSON_CONFIG, ConfigDef.Type.STRING, JSON_DEFAULT, ConfigDef.Importance.HIGH, JSON_DOC)
- .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
+ private static final ConfigDef CONFIG_DEF = new ConfigDef();
private SourceConfig sourceConfig;