You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by GitBox <gi...@apache.org> on 2021/03/11 08:32:04 UTC

[GitHub] [bahir-flink] eskabetxe commented on a change in pull request #114: InfluxDBv2.0 Connector

eskabetxe commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r592132222



##########
File path: flink-connector-influxdb2/pom.xml
##########
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>

Review comment:
       this should have the same scala version from parent.
   

##########
File path: flink-connector-influxdb2/pom.xml
##########
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <name>flink-connector-influxdb2</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <influxdbClient.version>2.0.0</influxdbClient.version>
+    <flink.new.version>1.12.2</flink.new.version>

Review comment:
       we should update the flink version on parent

##########
File path: pom.xml
##########
@@ -93,7 +94,7 @@
 
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
-    <log4j2.version>2.13.3</log4j2.version>
+    <log4j2.version>2.12.1</log4j2.version>

Review comment:
       why you lower the log4j2 version?

##########
File path: flink-connector-influxdb2/pom.xml
##########
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <name>flink-connector-influxdb2</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <influxdbClient.version>2.0.0</influxdbClient.version>
+    <flink.new.version>1.12.2</flink.new.version>
+    <lombok.version>1.18.10</lombok.version>
+    <scala.binary.new.version>2.11</scala.binary.new.version>
+    <spotless.version>2.7.0</spotless.version>
+    <druid.version>0.13.0-incubating</druid.version>
+    <!--  Test Properties  -->
+    <testcontainers.version>1.15.2</testcontainers.version>
+    <hamcrest.version>2.2</hamcrest.version>
+    <google.http.client.version>1.39.0</google.http.client.version>
+  </properties>
+
+  <dependencies>
+
+    <!-- Flink  -->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-core</artifactId>
+      <version>${flink.new.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-base</artifactId>
+      <version>${flink.new.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.projectlombok</groupId>

Review comment:
       I see that you use this for getter and log4j, as the use of this library could be a barrier for new committers, could you delombok that uses

##########
File path: flink-connector-influxdb2/pom.xml
##########
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <name>flink-connector-influxdb2</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <influxdbClient.version>2.0.0</influxdbClient.version>
+    <flink.new.version>1.12.2</flink.new.version>
+    <lombok.version>1.18.10</lombok.version>
+    <scala.binary.new.version>2.11</scala.binary.new.version>
+    <spotless.version>2.7.0</spotless.version>
+    <druid.version>0.13.0-incubating</druid.version>
+    <!--  Test Properties  -->
+    <testcontainers.version>1.15.2</testcontainers.version>
+    <hamcrest.version>2.2</hamcrest.version>
+    <google.http.client.version>1.39.0</google.http.client.version>
+  </properties>
+
+  <dependencies>
+
+    <!-- Flink  -->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-core</artifactId>
+      <version>${flink.new.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-base</artifactId>
+      <version>${flink.new.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+      <version>${lombok.version}</version>
+    </dependency>
+
+    <!-- InfluxDB  -->
+
+    <dependency>
+      <groupId>com.influxdb</groupId>
+      <artifactId>influxdb-client-java</artifactId>
+      <version>${influxdbClient.version}</version>
+    </dependency>
+
+    <!-- InfluxDB Line Protocol Parser by Apache Druid -->
+
+    <dependency>
+      <groupId>org.apache.druid.extensions</groupId>
+      <artifactId>druid-influx-extensions</artifactId>
+      <version>${druid.version}</version>
+    </dependency>
+
+    <!-- Logging -->
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <version>${log4j2.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <version>${log4j2.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <version>${log4j2.version}</version>
+    </dependency>
+
+    <!-- Flink Test Utils -->
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-test-utils_${scala.binary.new.version}</artifactId>
+      <version>${flink.new.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_${scala.binary.new.version}</artifactId>
+      <version>${flink.new.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+
+    <!-- Test container -->
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>junit-jupiter</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>influxdb</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>${google.http.client.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>

Review comment:
       the use of this kind of plugins should be discussed, and used on parent

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = this.parseIdentifier(line.identifier());
+
+        final Number timestamp = this.parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> this.parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> this.parseField(t, out));
+
+        return out;
+    }
+
+    private void parseTag(final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = this.parseIdentifier(tag.identifier(0));
+        final String value = this.parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = this.parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = this.parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = this.parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = this.parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private Object parseQuotedString(final String text) {
+        return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 1)).replaceAll("\"");
+    }
+
+    private Object parseNumber(final String raw) {
+        if (raw.endsWith("i")) {
+            return Long.valueOf(raw.substring(0, raw.length() - 1));
+        }
+
+        return new Double(raw);
+    }
+
+    private Object parseBool(final String raw) {
+        final char first = raw.charAt(0);
+        if (first == 't' || first == 'T') {

Review comment:
       this could be reduce to 
   return (first == 't' || first == 'T')

##########
File path: pom.xml
##########
@@ -75,6 +75,7 @@
     <module>flink-connector-akka</module>
     <module>flink-connector-flume</module>
     <module>flink-connector-influxdb</module>
+    <module>flink-connector-influxdb2</module>

Review comment:
       why create another module?
   the first one could not be updated?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import static lombok.Lombok.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+
+public final class InfluxDBSourceBuilder<OUT> {
+
+    private InfluxDBDataPointDeserializer<OUT> deserializationSchema;
+    // Configurations
+    private final Properties properties;
+
+    InfluxDBSourceBuilder() {
+        this.deserializationSchema = null;
+        this.properties = new Properties();
+    }
+
+    /**
+     * Sets the {@link InfluxDBDataPointDeserializer deserializer} of the {@link
+     * org.apache.flink.streaming.connectors.influxdb.common.DataPoint DataPoint} for the
+     * InfluxDBSource.
+     *
+     * @param dataPointDeserializer the deserializer for InfluxDB {@link
+     *     org.apache.flink.streaming.connectors.influxdb.common.DataPoint DataPoint}.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setDeserializer(
+            final InfluxDBDataPointDeserializer<OUT> dataPointDeserializer) {
+        this.deserializationSchema = dataPointDeserializer;
+        return this;
+    }
+
+    /**
+     * Sets the enqueue wait time, i.e., the time out of this InfluxDBSource.
+     *
+     * @param timeOut the enqueue wait time to use for this InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setEnqueueWaitTime(final long timeOut) {
+        return this.setProperty(
+                InfluxDBSourceOptions.ENQUEUE_WAIT_TIME.key(), String.valueOf(timeOut));
+    }
+
+    /**
+     * Sets the ingest queue capacity of this InfluxDBSource.
+     *
+     * @param capacity the capacity to use for this InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setIngestQueueCapacity(final int capacity) {
+        return this.setProperty(
+                InfluxDBSourceOptions.INGEST_QUEUE_CAPACITY.key(), String.valueOf(capacity));
+    }
+
+    /**
+     * Sets the maximum number of lines that should be parsed per HTTP request for this
+     * InfluxDBSource.
+     *
+     * @param max the maximum number of lines to use for this InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setMaximumLinesPerRequest(final int max) {
+        return this.setProperty(
+                InfluxDBSourceOptions.MAXIMUM_LINES_PER_REQUEST.key(), String.valueOf(max));
+    }
+
+    /**
+     * Sets the TCP port on which the split reader's HTTP server of this InfluxDBSource is running
+     * on.
+     *
+     * @param port the port to use for this InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setPort(final int port) {
+        return this.setProperty(InfluxDBSourceOptions.PORT.key(), String.valueOf(port));
+    }
+
+    /**
+     * Set arbitrary properties for the InfluxDBSource. The valid keys can be found in {@link
+     * InfluxDBSourceOptions}.
+     *
+     * @param properties the properties to set for the InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setProperties(final Properties properties) {
+        this.properties.putAll(properties);
+        return this;
+    }
+
+    /**
+     * Build the {@link InfluxDBSource}.
+     *
+     * @return a InfluxDBSource with the settings made for this builder.
+     */
+    public InfluxDBSource<OUT> build() {
+        this.sanityCheck();
+        return new InfluxDBSource<>(this.properties, this.deserializationSchema);
+    }
+
+    // ------------- private helpers  --------------
+    /**
+     * Set an arbitrary property for the InfluxDBSource. The valid keys can be found in {@link
+     * InfluxDBSourceOptions}.
+     *
+     * @param key the key of the property.
+     * @param value the value of the property.
+     * @return this InfluxDBSourceBuilder.
+     */
+    private InfluxDBSourceBuilder<OUT> setProperty(final String key, final String value) {
+        this.properties.setProperty(key, value);
+        return this;
+    }
+
+    private void sanityCheck() {
+        checkNotNull(

Review comment:
       why not use Preconditions.checkNotNull from flink ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org