You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2020/11/03 09:49:12 UTC
[plc4x] branch bug/kafka-source-cpu updated: Ooops
This is an automated email from the ASF dual-hosted git repository.
hutcheb pushed a commit to branch bug/kafka-source-cpu
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/bug/kafka-source-cpu by this push:
new 8cd3bfd Ooops
8cd3bfd is described below
commit 8cd3bfd500a17b391b1297f127435ccbd5b0e140
Author: hutcheb <be...@gmail.com>
AuthorDate: Tue Nov 3 04:48:17 2020 -0500
Ooops
---
.../org/apache/plc4x/kafka/Plc4xSinkConnector.java | 149 ---------------------
.../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 121 -----------------
.../java/org/apache/plc4x/kafka/config/Sink.java | 39 ------
.../org/apache/plc4x/kafka/config/SinkConfig.java | 66 ---------
4 files changed, 375 deletions(-)
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
deleted file mode 100644
index 253843a..0000000
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka;
-
-import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.kafka.common.config.Config;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigValue;
-import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.plc4x.kafka.config.Job;
-import org.apache.plc4x.kafka.config.JobReference;
-import org.apache.plc4x.kafka.config.Sink;
-import org.apache.plc4x.kafka.config.SinkConfig;
-import org.apache.plc4x.kafka.util.VersionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class Plc4xSinkConnector extends SinkConnector {
-
- private static final Logger log = LoggerFactory.getLogger(Plc4xSinkConnector.class);
-
- public static final String DEFAULT_TOPIC_CONFIG = "default-topic";
- private static final String DEFAULT_TOPIC_DOC = "Default topic to be used, if not otherwise configured.";
-
- public static final String SINK_CONFIG = "sinks";
- private static final String SINK_DOC = "List of sink names that will be configured.";
-
- private static final String CONNECTION_STRING_CONFIG = "connectionString";
- private static final String TOPIC_CONFIG = "topic";
-
-
- private SinkConfig sinkConfig;
-
- @Override
- public void start(Map<String, String> props) {
- sinkConfig = SinkConfig.fromPropertyMap(props);
- }
-
- @Override
- public void stop() {
- sinkConfig = null;
- }
-
- @Override
- public Class<? extends Task> taskClass() {
- return Plc4xSinkTask.class;
- }
-
- @Override
- public List<Map<String, String>> taskConfigs(int maxTasks) {
-
- // For each configured source we'll start a dedicated scraper instance collecting
- // all the scraper jobs enabled for this source.
- List<Map<String, String>> configs = new LinkedList<>();
- for (Sink sink : sinkConfig.getSinks()) {
- // Create a new task configuration.
- Map<String, String> taskConfig = new HashMap<>();
- taskConfig.put(Plc4xSinkTask.CONNECTION_NAME_CONFIG, sink.getName());
- taskConfig.put(Plc4xSinkTask.PLC4X_CONNECTION_STRING_CONFIG, sink.getConnectionString());
- configs.add(taskConfig);
- }
- return configs;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Config validate(Map<String, String> connectorConfigs) {
- ////////////////////////////////////////////////////
- // Get the static part of the config
- Config config = super.validate(connectorConfigs);
-
- ////////////////////////////////////////////////////
- // Add the dynamic parts of the config
-
- // Find the important config elements
- String defaultTopic = null;
- ConfigValue sinks = null;
-
- for (ConfigValue configValue : config.configValues()) {
- switch (configValue.name()) {
- case DEFAULT_TOPIC_CONFIG:
- defaultTopic = (String) configValue.value();
- break;
- case SINK_CONFIG:
- sinks = configValue;
- break;
- default:
- // Just ignore the others.
- }
- }
-
- // Configure the sinks
- if(sinks != null) {
- final List<String> sinkNames = (List<String>) sinks.value();
- for (String sinkName : sinkNames) {
- String connectionStringConfig = SINK_CONFIG + "." + sinkName + "." + CONNECTION_STRING_CONFIG;
- final ConfigValue sinkConnectionStringConfigValue = new ConfigValue(connectionStringConfig);
- config.configValues().add(sinkConnectionStringConfigValue);
- String connectionString = connectorConfigs.get(connectionStringConfig);
- sinkConnectionStringConfigValue.value();
- if (connectionString == null) {
- sinkConnectionStringConfigValue.addErrorMessage(connectionStringConfig + " is mandatory");
- } else {
- // TODO: Check if the connection string is valid.
-
- String sinkTopicConfig = SINK_CONFIG + "." + sinkName + "." + TOPIC_CONFIG;
- final ConfigValue sinkTopicConfigValue = new ConfigValue(sinkTopicConfig);
- config.configValues().add(sinkTopicConfigValue);
- String sinkTopic = connectorConfigs.get(sinkTopicConfig);
- sinkTopicConfigValue.value(sinkTopic);
- }
- }
- }
-
- return config;
- }
-
- @Override
- public ConfigDef config() {
- return new ConfigDef()
- .define(DEFAULT_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, DEFAULT_TOPIC_DOC)
- .define(SINK_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, SINK_DOC);
- }
-
- @Override
- public String version() {
- return VersionUtil.getVersion();
- }
-
-}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
deleted file mode 100644
index b662c42..0000000
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka;
-
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.data.Date;
-import org.apache.kafka.connect.data.*;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder;
-import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
-import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder;
-import org.apache.plc4x.java.scraper.exception.ScraperException;
-import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
-import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
-import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
-import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
-import org.apache.plc4x.kafka.util.VersionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-
-/**
- * Source Connector Task polling the data source at a given rate.
- * A timer thread is scheduled which sets the fetch flag to true every rate milliseconds.
- * When poll() is invoked, the calling thread waits until the fetch flag is set for WAIT_LIMIT_MILLIS.
- * If the flag does not become true, the method returns null, otherwise a fetch is performed.
- */
-public class Plc4xSinkTask extends SinkTask {
-
- private static final Logger log = LoggerFactory.getLogger(Plc4xSinkTask.class);
-
- /*
- * Config of the task.
- */
- static final String CONNECTION_NAME_CONFIG = "connection-name";
- private static final String CONNECTION_NAME_STRING_DOC = "Connection Name";
-
- static final String PLC4X_CONNECTION_STRING_CONFIG = "plc4x-connection-string";
- private static final String PLC4X_CONNECTION_STRING_DOC = "PLC4X Connection String";
-
- private static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(CONNECTION_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, CONNECTION_NAME_STRING_DOC)
- .define(PLC4X_CONNECTION_STRING_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, PLC4X_CONNECTION_STRING_DOC);
-
- /*
- * Configuration of the output.
- */
- private static final String SINK_NAME_FIELD = "sink-name";
- private static final String JOB_NAME_FIELD = "job-name";
-
- private static final Schema KEY_SCHEMA =
- new SchemaBuilder(Schema.Type.STRUCT)
- .field(SINK_NAME_FIELD, Schema.STRING_SCHEMA)
- .field(JOB_NAME_FIELD, Schema.STRING_SCHEMA)
- .build();
-
- @Override
- public String version() {
- return VersionUtil.getVersion();
- }
-
- private PlcDriverManager plcDriverManager;
-
- @Override
- public void start(Map<String, String> props) {
- AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
- String connectionName = config.getString(CONNECTION_NAME_CONFIG);
- String plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
- Map<String, String> topics = new HashMap<>();
-
- //try {
- log.info("Creating Pooled PLC4x driver manager");
- plcDriverManager = new PooledPlcDriverManager();
- //} catch (PlcConnectionException e) {
- // log.error("Error starting the scraper", e.toString());
- //}
- }
-
- @Override
- public void stop() {
- synchronized (this) {
- notifyAll(); // wake up thread waiting in awaitFetch
- }
- }
-
- @Override
- public void put(Collection<SinkRecord> records) {
- if (records.isEmpty()) {
- return;
- }
- log.info(records.toString());
- return;
- }
-}
\ No newline at end of file
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Sink.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Sink.java
deleted file mode 100644
index 07ba33f..0000000
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Sink.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.config;
-
-public class Sink {
-
- private final String name;
- private final String connectionString;
-
- public Sink(String name, String connectionString) {
- this.name = name;
- this.connectionString = connectionString;
- }
-
- public String getName() {
- return name;
- }
-
- public String getConnectionString() {
- return connectionString;
- }
-
-}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SinkConfig.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SinkConfig.java
deleted file mode 100644
index 29de508..0000000
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SinkConfig.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.config;
-
-import org.apache.plc4x.kafka.Plc4xSinkConnector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class SinkConfig {
-
- private static final String CONNECTION_STRING_CONFIG = "connectionString";
- private static final String TOPIC_CONFIG = "topic";
-
- private final List<Sink> sinks;
-
- public static SinkConfig fromPropertyMap(Map<String, String> properties) {
- String defaultTopic = properties.getOrDefault(Plc4xSinkConnector.DEFAULT_TOPIC_CONFIG, null);
-
- String[] sinkNames = properties.getOrDefault(Plc4xSinkConnector.SINK_CONFIG, "").split(",");
- List<Sink> sinks = new ArrayList<>(sinkNames.length);
- for (String sinkName : sinkNames) {
- String connectionString = properties.get(
- Plc4xSinkConnector.SINK_CONFIG + "." + sinkName + "." + CONNECTION_STRING_CONFIG);
- String sinkTopic = properties.getOrDefault(
- Plc4xSinkConnector.SINK_CONFIG + "." + sinkName + "." + TOPIC_CONFIG, defaultTopic);
- Sink sink = new Sink(sinkName, connectionString);
- sinks.add(sink);
- }
-
- return new SinkConfig(sinks);
- }
-
- public SinkConfig(List<Sink> sinks) {
- this.sinks = sinks;
- }
-
- public List<Sink> getSinks() {
- return sinks;
- }
-
- public Sink getSink(String sinkName) {
- if(sinks == null) {
- return null;
- }
- return sinks.stream().filter(sink -> sink.getName().equals(sinkName)).findFirst().orElse(null);
- }
-}