You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2020/11/03 09:49:12 UTC

[plc4x] branch bug/kafka-source-cpu updated: Ooops

This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch bug/kafka-source-cpu
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/bug/kafka-source-cpu by this push:
     new 8cd3bfd  Ooops
8cd3bfd is described below

commit 8cd3bfd500a17b391b1297f127435ccbd5b0e140
Author: hutcheb <be...@gmail.com>
AuthorDate: Tue Nov 3 04:48:17 2020 -0500

    Ooops
---
 .../org/apache/plc4x/kafka/Plc4xSinkConnector.java | 149 ---------------------
 .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 121 -----------------
 .../java/org/apache/plc4x/kafka/config/Sink.java   |  39 ------
 .../org/apache/plc4x/kafka/config/SinkConfig.java  |  66 ---------
 4 files changed, 375 deletions(-)

diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
deleted file mode 100644
index 253843a..0000000
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka;
-
-import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.kafka.common.config.Config;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigValue;
-import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.plc4x.kafka.config.Job;
-import org.apache.plc4x.kafka.config.JobReference;
-import org.apache.plc4x.kafka.config.Sink;
-import org.apache.plc4x.kafka.config.SinkConfig;
-import org.apache.plc4x.kafka.util.VersionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class Plc4xSinkConnector extends SinkConnector {
-
-    private static final Logger log = LoggerFactory.getLogger(Plc4xSinkConnector.class);
-
-    public static final String DEFAULT_TOPIC_CONFIG = "default-topic";
-    private static final String DEFAULT_TOPIC_DOC = "Default topic to be used, if not otherwise configured.";
-
-    public static final String SINK_CONFIG = "sinks";
-    private static final String SINK_DOC = "List of sink names that will be configured.";
-
-    private static final String CONNECTION_STRING_CONFIG = "connectionString";
-    private static final String TOPIC_CONFIG = "topic";
-
-
-    private SinkConfig sinkConfig;
-
-    @Override
-    public void start(Map<String, String> props) {
-        sinkConfig = SinkConfig.fromPropertyMap(props);
-    }
-
-    @Override
-    public void stop() {
-        sinkConfig = null;
-    }
-
-    @Override
-    public Class<? extends Task> taskClass() {
-        return Plc4xSinkTask.class;
-    }
-
-    @Override
-    public List<Map<String, String>> taskConfigs(int maxTasks) {
-
-        // For each configured source we'll start a dedicated scraper instance collecting
-        // all the scraper jobs enabled for this source.
-        List<Map<String, String>> configs = new LinkedList<>();
-        for (Sink sink : sinkConfig.getSinks()) {
-            // Create a new task configuration.
-            Map<String, String> taskConfig = new HashMap<>();
-            taskConfig.put(Plc4xSinkTask.CONNECTION_NAME_CONFIG, sink.getName());
-            taskConfig.put(Plc4xSinkTask.PLC4X_CONNECTION_STRING_CONFIG, sink.getConnectionString());
-            configs.add(taskConfig);
-        }
-        return configs;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public Config validate(Map<String, String> connectorConfigs) {
-        ////////////////////////////////////////////////////
-        // Get the static part of the config
-        Config config = super.validate(connectorConfigs);
-
-        ////////////////////////////////////////////////////
-        // Add the dynamic parts of the config
-
-        // Find the important config elements
-        String defaultTopic = null;
-        ConfigValue sinks = null;
-
-        for (ConfigValue configValue : config.configValues()) {
-            switch (configValue.name()) {
-                case DEFAULT_TOPIC_CONFIG:
-                    defaultTopic = (String) configValue.value();
-                    break;
-                case SINK_CONFIG:
-                    sinks = configValue;
-                    break;
-                default:
-                    // Just ignore the others.
-            }
-        }
-
-        // Configure the sinks
-        if(sinks != null) {
-            final List<String> sinkNames = (List<String>) sinks.value();
-            for (String sinkName : sinkNames) {
-                String connectionStringConfig = SINK_CONFIG + "." + sinkName + "." + CONNECTION_STRING_CONFIG;
-                final ConfigValue sinkConnectionStringConfigValue = new ConfigValue(connectionStringConfig);
-                config.configValues().add(sinkConnectionStringConfigValue);
-                String connectionString = connectorConfigs.get(connectionStringConfig);
-                sinkConnectionStringConfigValue.value();
-                if (connectionString == null) {
-                    sinkConnectionStringConfigValue.addErrorMessage(connectionStringConfig + " is mandatory");
-                } else {
-                    // TODO: Check if the connection string is valid.
-
-                    String sinkTopicConfig = SINK_CONFIG + "." + sinkName + "." + TOPIC_CONFIG;
-                    final ConfigValue sinkTopicConfigValue = new ConfigValue(sinkTopicConfig);
-                    config.configValues().add(sinkTopicConfigValue);
-                    String sinkTopic = connectorConfigs.get(sinkTopicConfig);
-                    sinkTopicConfigValue.value(sinkTopic);
-                }
-            }
-        }
-
-        return config;
-    }
-
-    @Override
-    public ConfigDef config() {
-        return new ConfigDef()
-            .define(DEFAULT_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, DEFAULT_TOPIC_DOC)
-            .define(SINK_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, SINK_DOC);
-    }
-
-    @Override
-    public String version() {
-        return VersionUtil.getVersion();
-    }
-
-}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
deleted file mode 100644
index b662c42..0000000
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka;
-
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.data.Date;
-import org.apache.kafka.connect.data.*;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder;
-import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
-import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder;
-import org.apache.plc4x.java.scraper.exception.ScraperException;
-import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
-import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
-import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
-import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
-import org.apache.plc4x.kafka.util.VersionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-
-/**
- * Source Connector Task polling the data source at a given rate.
- * A timer thread is scheduled which sets the fetch flag to true every rate milliseconds.
- * When poll() is invoked, the calling thread waits until the fetch flag is set for WAIT_LIMIT_MILLIS.
- * If the flag does not become true, the method returns null, otherwise a fetch is performed.
- */
-public class Plc4xSinkTask extends SinkTask {
-
-    private static final Logger log = LoggerFactory.getLogger(Plc4xSinkTask.class);
-
-    /*
-     * Config of the task.
-     */
-    static final String CONNECTION_NAME_CONFIG = "connection-name";
-    private static final String CONNECTION_NAME_STRING_DOC = "Connection Name";
-
-    static final String PLC4X_CONNECTION_STRING_CONFIG = "plc4x-connection-string";
-    private static final String PLC4X_CONNECTION_STRING_DOC = "PLC4X Connection String";
-
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
-        .define(CONNECTION_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, CONNECTION_NAME_STRING_DOC)
-        .define(PLC4X_CONNECTION_STRING_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, PLC4X_CONNECTION_STRING_DOC);
-
-    /*
-     * Configuration of the output.
-     */
-    private static final String SINK_NAME_FIELD = "sink-name";
-    private static final String JOB_NAME_FIELD = "job-name";
-
-    private static final Schema KEY_SCHEMA =
-        new SchemaBuilder(Schema.Type.STRUCT)
-            .field(SINK_NAME_FIELD, Schema.STRING_SCHEMA)
-            .field(JOB_NAME_FIELD, Schema.STRING_SCHEMA)
-            .build();
-
-    @Override
-    public String version() {
-        return VersionUtil.getVersion();
-    }
-
-    private PlcDriverManager plcDriverManager;
-
-    @Override
-    public void start(Map<String, String> props) {
-        AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
-        String connectionName = config.getString(CONNECTION_NAME_CONFIG);
-        String plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
-        Map<String, String> topics = new HashMap<>();
-
-        //try {
-            log.info("Creating Pooled PLC4x driver manager");
-            plcDriverManager = new PooledPlcDriverManager();
-        //} catch (PlcConnectionException e) {
-        //    log.error("Error starting the scraper", e.toString());
-        //}
-    }
-
-    @Override
-    public void stop() {
-        synchronized (this) {
-            notifyAll(); // wake up thread waiting in awaitFetch
-        }
-    }
-
-    @Override
-    public void put(Collection<SinkRecord> records) {
-        if (records.isEmpty()) {
-            return;
-        }
-        log.info(records.toString());
-        return;
-    }
-}
\ No newline at end of file
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Sink.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Sink.java
deleted file mode 100644
index 07ba33f..0000000
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Sink.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.config;
-
-public class Sink {
-
-    private final String name;
-    private final String connectionString;
-
-    public Sink(String name, String connectionString) {
-        this.name = name;
-        this.connectionString = connectionString;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public String getConnectionString() {
-        return connectionString;
-    }
-
-}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SinkConfig.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SinkConfig.java
deleted file mode 100644
index 29de508..0000000
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SinkConfig.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.config;
-
-import org.apache.plc4x.kafka.Plc4xSinkConnector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class SinkConfig {
-
-    private static final String CONNECTION_STRING_CONFIG = "connectionString";
-    private static final String TOPIC_CONFIG = "topic";
-
-    private final List<Sink> sinks;
-
-    public static SinkConfig fromPropertyMap(Map<String, String> properties) {
-        String defaultTopic = properties.getOrDefault(Plc4xSinkConnector.DEFAULT_TOPIC_CONFIG, null);
-
-        String[] sinkNames = properties.getOrDefault(Plc4xSinkConnector.SINK_CONFIG, "").split(",");
-        List<Sink> sinks = new ArrayList<>(sinkNames.length);
-        for (String sinkName : sinkNames) {
-            String connectionString = properties.get(
-                Plc4xSinkConnector.SINK_CONFIG + "." + sinkName + "." + CONNECTION_STRING_CONFIG);
-            String sinkTopic = properties.getOrDefault(
-                Plc4xSinkConnector.SINK_CONFIG + "." + sinkName + "." + TOPIC_CONFIG, defaultTopic);
-            Sink sink = new Sink(sinkName, connectionString);
-            sinks.add(sink);
-        }
-
-        return new SinkConfig(sinks);
-    }
-
-    public SinkConfig(List<Sink> sinks) {
-        this.sinks = sinks;
-    }
-
-    public List<Sink> getSinks() {
-        return sinks;
-    }
-
-    public Sink getSink(String sinkName) {
-        if(sinks == null) {
-            return null;
-        }
-        return sinks.stream().filter(sink -> sink.getName().equals(sinkName)).findFirst().orElse(null);
-    }
-}