You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/14 17:08:18 UTC

[plc4x] 03/04: - Removed the kafka connect sink

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch featule/kafka-connect-refactoring
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 328a9431512c2ac0a1799836a58b05da8e78ac09
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Tue Aug 13 16:19:57 2019 +0200

    - Removed the kafka connect sink
---
 plc4j/integrations/apache-kafka/pom.xml            |   4 +-
 .../org/apache/plc4x/kafka/Plc4xSinkConnector.java |  76 --------------
 .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 116 ---------------------
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   |  20 +---
 4 files changed, 3 insertions(+), 213 deletions(-)

diff --git a/plc4j/integrations/apache-kafka/pom.xml b/plc4j/integrations/apache-kafka/pom.xml
index 3e38ccb..857f412 100644
--- a/plc4j/integrations/apache-kafka/pom.xml
+++ b/plc4j/integrations/apache-kafka/pom.xml
@@ -41,11 +41,11 @@
       <artifactId>plc4j-api</artifactId>
       <version>0.5.0-SNAPSHOT</version>
     </dependency>
-    <!--dependency>
+    <dependency>
       <groupId>org.apache.plc4x</groupId>
       <artifactId>plc4j-scraper</artifactId>
       <version>0.5.0-SNAPSHOT</version>
-    </dependency-->
+    </dependency>
 
     <dependency>
       <groupId>com.google.code.gson</groupId>
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
deleted file mode 100644
index 7e0b477..0000000
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka;
-
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.plc4x.kafka.util.VersionUtil;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class Plc4xSinkConnector extends SinkConnector {
-
-    private static final String URL_CONFIG = "url";
-    private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
-    private static final ConfigDef CONFIG_DEF =
-        new ConfigDef().define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
-
-    private String url;
-
-    @Override
-    public Class<? extends Task> taskClass() {
-        return Plc4xSinkTask.class;
-    }
-
-    @Override
-    public List<Map<String, String>> taskConfigs(int maxTasks) {
-        List<Map<String, String>> configs = new LinkedList<>();
-        for (int i = 0; i < maxTasks; i++) {
-            Map<String, String> taskConfig = new HashMap<>();
-            taskConfig.put(URL_CONFIG, url);
-            configs.add(taskConfig);
-        }
-        return configs;
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
-        url = config.getString(URL_CONFIG);
-    }
-
-    @Override
-    public void stop() {}
-
-    @Override
-    public ConfigDef config() {
-        return CONFIG_DEF;
-    }
-
-    @Override
-    public String version() {
-        return VersionUtil.getVersion();
-    }
-
-}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
deleted file mode 100644
index 855e759..0000000
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka;
-
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
-import org.apache.plc4x.kafka.util.VersionUtil;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-public class Plc4xSinkTask extends SinkTask {
-
-    private String url;
-
-    private PlcConnection plcConnection;
-
-    @Override
-    public String version() {
-        return VersionUtil.getVersion();
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        /*AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
-        url = config.getString(Plc4xSinkConnector.URL_CONFIG);*/
-
-        openConnection();
-
-        if (!plcConnection.getMetadata().canWrite()) {
-            throw new ConnectException("Writing not supported on this connection");
-        }
-    }
-
-    @Override
-    public void stop() {
-        closeConnection();
-    }
-
-    @Override
-    public void put(Collection<SinkRecord> records) {
-        for (SinkRecord record: records) {
-            String query = record.key().toString();
-            Object value = record.value();
-            PlcWriteRequest.Builder builder = plcConnection.writeRequestBuilder();
-            PlcWriteRequest plcRequest = addToBuilder(builder, query, value).build();
-            doWrite(plcRequest);
-        }
-    }
-
-    // TODO: fix this
-    private PlcWriteRequest.Builder addToBuilder(PlcWriteRequest.Builder builder, String query, Object obj) {
-        Class<?> type = obj.getClass();
-
-        if (type.equals(Integer.class)) {
-            int value = (int) obj;
-            builder.addItem(query, query, value);
-        } else if (type.equals(String.class)) {
-            String value = (String) obj;
-            builder.addItem(query, query, value);
-        }
-
-        return builder;
-    }
-
-    private void openConnection() {
-        try {
-            plcConnection = new PlcDriverManager().getConnection(url);
-            plcConnection.connect();
-        } catch (PlcConnectionException e) {
-            throw new ConnectException("Could not establish a PLC connection", e);
-        }
-    }
-
-    private void closeConnection() {
-        if (plcConnection != null) {
-            try {
-                plcConnection.close();
-            } catch (Exception e) {
-                throw new PlcRuntimeException("Caught exception while closing connection to PLC", e);
-            }
-        }
-    }
-
-    private void doWrite(PlcWriteRequest request) {
-        try {
-            request.execute().get();
-        } catch (ExecutionException | InterruptedException e) {
-            throw new ConnectException("Caught exception during write", e);
-        }
-    }
-
-}
\ No newline at end of file
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 200dff5..ffb6b35 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -33,25 +33,7 @@ public class Plc4xSourceConnector extends SourceConnector {
 
     private static final Logger log = LoggerFactory.getLogger(Plc4xSourceConnector.class);
 
-    private static final String TOPIC_CONFIG = "topic";
-    private static final String TOPIC_DOC = "Kafka topic to publish to";
-
-    private static final String QUERIES_CONFIG = "queries";
-    private static final String QUERIES_DOC = "Field queries to be sent to the PLC";
-
-    private static final String JSON_CONFIG = "json.url";
-    private static final String JSON_DEFAULT = "";
-    private static final String JSON_DOC = "JSON configuration";
-
-    private static final String RATE_CONFIG = "rate";
-    private static final Integer RATE_DEFAULT = 1000;
-    private static final String RATE_DOC = "Polling rate";
-
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
-        .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
-        .define(QUERIES_CONFIG, ConfigDef.Type.LIST, new LinkedList<>(), ConfigDef.Importance.HIGH, QUERIES_DOC)
-        .define(JSON_CONFIG, ConfigDef.Type.STRING, JSON_DEFAULT, ConfigDef.Importance.HIGH, JSON_DOC)
-        .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
+    private static final ConfigDef CONFIG_DEF = new ConfigDef();
 
     private SourceConfig sourceConfig;