You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2023/05/09 13:57:00 UTC
[bahir-flink] 02/05: [BAHIR-308] Bump flink version to 1.15.3
This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
commit 06fe56a73540c100f8699d2af29e396330452d95
Author: Joao Boto <bo...@boto.pro>
AuthorDate: Wed Mar 8 17:11:35 2023 +0100
[BAHIR-308] Bump flink version to 1.15.3
---
.github/workflows/maven-ci.yml | 27 +-
flink-connector-activemq/pom.xml | 6 +-
flink-connector-akka/pom.xml | 2 +-
flink-connector-flume/pom.xml | 4 +-
flink-connector-influxdb/pom.xml | 4 +-
flink-connector-influxdb2/pom.xml | 4 +-
.../connectors/influxdb/common/DataPoint.java | 3 +-
.../connectors/influxdb/common/InfluxParser.java | 9 +-
.../connectors/influxdb/sink/InfluxDBSink.java | 5 +-
.../influxdb/sink/InfluxDBSinkBuilder.java | 15 +-
.../commiter/InfluxDBCommittableSerializer.java | 3 +-
.../influxdb/sink/commiter/InfluxDBCommitter.java | 15 +-
.../sink/writer/InfluxDBPointSerializer.java | 7 +-
.../sink/writer/InfluxDBSchemaSerializer.java | 3 +-
.../influxdb/sink/writer/InfluxDBWriter.java | 17 +-
.../connectors/influxdb/sink2/InfluxDBSink.java | 62 ++++
.../{sink => sink2}/InfluxDBSinkBuilder.java | 27 +-
.../influxdb/sink2/InfluxDBSinkOptions.java | 101 ++++++
.../writer/InfluxDBSchemaSerializer.java | 5 +-
.../{sink => sink2}/writer/InfluxDBWriter.java | 102 +++---
.../connectors/influxdb/source/InfluxDBSource.java | 10 +-
.../influxdb/source/InfluxDBSourceBuilder.java | 4 +-
.../source/enumerator/InfluxDBSplitEnumerator.java | 7 +-
.../connectors/influxdb/source/http/Handler.java | 5 +-
.../influxdb/source/http/HealthCheckHandler.java | 3 +-
.../influxdb/source/http/WriteAPIHandler.java | 13 +-
.../source/reader/InfluxDBRecordEmitter.java | 3 +-
.../source/reader/InfluxDBSourceReader.java | 5 +-
.../source/reader/InfluxDBSplitReader.java | 22 +-
.../InfluxDBDataPointDeserializer.java | 5 +-
.../source/split/InfluxDBSplitSerializer.java | 3 +-
.../influxdb/common/InfluxParserTest.java | 10 +-
.../influxdb/sink/InfluxDBSinkBuilderTest.java | 59 ++--
.../InfluxDBSinkIntegrationTestCase.java | 49 +--
.../writer}/InfluxDBTestSerializer.java | 3 +-
.../InfluxDBSinkIntegrationTestCase.java | 54 +--
.../writer}/InfluxDBTestSerializer.java | 5 +-
.../influxdb/source/InfluxDBSourceBuilderTest.java | 4 +-
.../InfluxDBSourceIntegrationTestCase.java | 39 +--
.../reader}/InfluxDBTestDeserializer.java | 2 +-
.../influxdb/util/InfluxDBContainer.java | 76 +----
...Container.java => InfluxDBContainerCustom.java} | 49 +--
flink-connector-kudu/pom.xml | 16 +-
.../kudu/connector/ColumnSchemasFactory.java | 1 -
.../kudu/connector/CreateTableOptionsFactory.java | 1 -
.../connectors/kudu/connector/KuduFilterInfo.java | 1 -
.../connectors/kudu/connector/KuduTableInfo.java | 3 +-
.../convertor/RowResultRowDataConvertor.java | 6 +-
.../kudu/connector/failure/KuduFailureHandler.java | 1 -
.../kudu/connector/reader/KuduReader.java | 10 +-
.../kudu/connector/reader/KuduReaderConfig.java | 3 +-
.../writer/AbstractSingleOperationMapper.java | 1 -
.../kudu/connector/writer/KuduOperationMapper.java | 1 -
.../kudu/connector/writer/KuduWriter.java | 9 +-
.../kudu/connector/writer/KuduWriterConfig.java | 3 +-
.../kudu/connector/writer/PojoOperationMapper.java | 6 +-
.../connectors/kudu/format/KuduOutputFormat.java | 1 -
.../flink/connectors/kudu/streaming/KuduSink.java | 1 -
.../kudu/table/AbstractReadOnlyCatalog.java | 22 +-
.../flink/connectors/kudu/table/KuduCatalog.java | 35 +-
.../connectors/kudu/table/KuduTableFactory.java | 28 +-
.../connectors/kudu/table/KuduTableSource.java | 13 +-
.../kudu/table/UpsertOperationMapper.java | 1 -
.../kudu/table/dynamic/KuduDynamicTableSource.java | 40 ++-
.../table/dynamic/catalog/KuduDynamicCatalog.java | 32 +-
.../kudu/table/utils/KuduTableUtils.java | 12 +-
.../connectors/kudu/table/utils/KuduTypeUtils.java | 14 +-
.../connectors/kudu/connector/KuduTestBase.java | 19 +-
.../kudu/format/KuduOutputFormatTest.java | 3 +-
.../connectors/kudu/streaming/KuduSinkTest.java | 1 -
.../connectors/kudu/table/KuduCatalogTest.java | 1 -
.../kudu/table/KuduTableFactoryTest.java | 11 +-
.../kudu/table/KuduTableSourceITCase.java | 4 +-
.../connectors/kudu/table/KuduTableSourceTest.java | 18 +-
.../connectors/kudu/table/KuduTableTestUtils.java | 4 +-
.../kudu/writer/AbstractOperationTest.java | 8 +-
.../kudu/writer/PojoOperationMapperTest.java | 3 +-
.../kudu/writer/RowOperationMapperTest.java | 1 -
.../kudu/writer/TupleOpertaionMapperTest.java | 1 -
flink-connector-netty/pom.xml | 2 +-
.../netty/example/StreamSqlExample.scala | 11 +-
flink-connector-pinot/pom.xml | 14 +-
.../streaming/connectors/pinot/PinotSink.java | 376 ---------------------
.../PinotSinkGlobalCommittableSerializer.java | 83 -----
.../streaming/connectors/pinot/v2/PinotSink.java | 128 +++++++
.../connectors/pinot/v2/PinotSinkBuilder.java | 165 +++++++++
.../{ => v2}/committer/PinotSinkCommittable.java | 4 +-
.../committer}/PinotSinkCommittableSerializer.java | 3 +-
.../committer/PinotSinkCommitter.java} | 244 +++++++------
.../external/EventTimeExtractor.java} | 54 +--
.../external/JsonSerializer.java} | 33 +-
.../writer/PinotWriter.java} | 163 +++++----
.../pinot/{ => v2}/writer/PinotWriterSegment.java | 10 +-
.../writer/PinotWriterState.java} | 6 +-
.../writer/PinotWriterStateSerializer.java} | 15 +-
.../connectors/pinot/PinotClusterContainer.java | 131 +++++++
.../connectors/pinot/PinotTestHelper.java | 6 +-
.../connectors/pinot/{ => v2}/PinotSinkTest.java | 50 ++-
.../connectors/pinot/{ => v2}/PinotTestBase.java | 65 ++--
.../src/test/resources/log4j.properties | 13 +
.../src/test/resources/log4j2-test.properties | 46 +++
.../pinot-broker.conf} | 12 +-
.../pinot-controller.conf} | 16 +-
.../pinot-server.conf} | 15 +-
flink-connector-redis/pom.xml | 8 +-
.../connectors/redis/RedisITCaseBase.java | 2 +-
flink-library-siddhi/pom.xml | 4 +-
pom.xml | 14 +-
108 files changed, 1383 insertions(+), 1496 deletions(-)
diff --git a/.github/workflows/maven-ci.yml b/.github/workflows/maven-ci.yml
index 2b41574..ef344b9 100644
--- a/.github/workflows/maven-ci.yml
+++ b/.github/workflows/maven-ci.yml
@@ -28,21 +28,30 @@ jobs:
runs-on: ubuntu-latest
strategy:
+ fail-fast: false
matrix:
java: ['8', '11']
- flink-version: ['1.14.6']
- scala-version: ['2.12']
+ flink-version: ['1.15.3']
+ connector: [
+ 'flink-connector-activemq',
+ 'flink-connector-akka',
+ 'flink-connector-flume',
+ 'flink-connector-influxdb',
+ 'flink-connector-influxdb2',
+ 'flink-connector-kudu',
+ 'flink-connector-netty',
+ 'flink-connector-pinot',
+ 'flink-connector-redis',
+ 'flink-library-siddhi'
+ ]
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v3
- name: Set up JDK ${{ matrix.java }}
- uses: actions/setup-java@v2
+ uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java }}
- distribution: 'adopt'
+ distribution: 'zulu'
cache: maven
- - name: Change scala version to ${{ matrix.scala-version }}
- run: ./dev/change-scala-version.sh ${{ matrix.scala-version }}
- shell: bash
- name: Build with flink ${{ matrix.flink-version }}
- run: mvn -B clean verify -Dscala-${{ matrix.scala-version }} -Dflink.version=${{ matrix.flink-version }}
+ run: mvn -B clean verify -pl ${{ matrix.connector }} -am -Dscala-2.12 -Dflink.version=${{ matrix.flink-version }}
diff --git a/flink-connector-activemq/pom.xml b/flink-connector-activemq/pom.xml
index f70e158..cde65b6 100644
--- a/flink-connector-activemq/pom.xml
+++ b/flink-connector-activemq/pom.xml
@@ -92,16 +92,16 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <artifactId>flink-test-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/flink-connector-akka/pom.xml b/flink-connector-akka/pom.xml
index b929dce..53da588 100644
--- a/flink-connector-akka/pom.xml
+++ b/flink-connector-akka/pom.xml
@@ -63,7 +63,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
index e900aac..46f3b1a 100644
--- a/flink-connector-flume/pom.xml
+++ b/flink-connector-flume/pom.xml
@@ -51,11 +51,11 @@ under the License.
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <artifactId>flink-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/flink-connector-influxdb/pom.xml b/flink-connector-influxdb/pom.xml
index c272e85..1108cfa 100644
--- a/flink-connector-influxdb/pom.xml
+++ b/flink-connector-influxdb/pom.xml
@@ -49,11 +49,11 @@ under the License.
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <artifactId>flink-test-utils</artifactId>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
diff --git a/flink-connector-influxdb2/pom.xml b/flink-connector-influxdb2/pom.xml
index 74c8fd1..48f26df 100644
--- a/flink-connector-influxdb2/pom.xml
+++ b/flink-connector-influxdb2/pom.xml
@@ -93,12 +93,12 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <artifactId>flink-test-utils</artifactId>
</dependency>
<dependency>
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
index 5e3f86a..966c5f0 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.connectors.influxdb.common;
import com.influxdb.Arguments;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
+
+import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import javax.annotation.Nullable;
/**
* InfluxDB data point class.
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
index fd91046..0051806 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
@@ -17,10 +17,6 @@
*/
package org.apache.flink.streaming.connectors.influxdb.common;
-import java.text.ParseException;
-import java.util.List;
-import java.util.regex.Pattern;
-import javax.annotation.Nullable;
import org.antlr.v4.runtime.ANTLRInputStream;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CommonTokenStream;
@@ -30,6 +26,11 @@ import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
import org.apache.flink.annotation.Internal;
+import javax.annotation.Nullable;
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+
/**
* This is an InfluxDB line protocol parser.
*
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java
index 1b6df26..705e5a0 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java
@@ -18,8 +18,6 @@
package org.apache.flink.streaming.connectors.influxdb.sink;
import com.influxdb.client.write.Point;
-import java.util.List;
-import java.util.Optional;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
@@ -32,6 +30,9 @@ import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBPointS
import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter;
+import java.util.List;
+import java.util.Optional;
+
/**
* This Sink implementation of InfluxDB/Line Protocol. Please use a {@link InfluxDBSinkBuilder} to
* construct a {@link InfluxDBSink}. The following example shows how to create an InfluxDBSink
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
index 6b9fb5e..e1a0105 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
@@ -17,21 +17,14 @@
*/
package org.apache.flink.streaming.connectors.influxdb.sink;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_TOKEN;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.*;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* The @builder class for {@link InfluxDBSink} to make it easier for the users to construct a {@link
* InfluxDBSink}.
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java
index 9f11662..44673ec 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java
@@ -17,10 +17,11 @@
*/
package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
-import java.nio.ByteBuffer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import java.nio.ByteBuffer;
+
/**
* This class Serialize and deserializes the commit values. Since we are sending the timestamp value
* as a committable the Long object is (de)serialized.
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
index 3872eed..2ef038d 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
@@ -17,22 +17,23 @@
*/
package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
-
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+
/**
* The InfluxDBCommitter implements the {@link Committer} interface The InfluxDBCommitter is called
* whenever a checkpoint is set by Flink. When this class is called it writes a checkpoint data
@@ -89,7 +90,7 @@ public final class InfluxDBCommitter implements Committer<Long> {
try (final WriteApi writeApi = this.influxDBClient.getWriteApi()) {
final Point point = new Point("checkpoint");
point.addField("checkpoint", "flink");
- timestamp.ifPresent(aTime -> point.time(aTime, WritePrecision.NS));
+ timestamp.ifPresent(aTime -> point.time(aTime, WritePrecision.MS));
writeApi.writePoint(point);
LOG.debug("Checkpoint data point write at {}", point.toLineProtocol());
}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBPointSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBPointSerializer.java
index 4be9b7f..5ca1590 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBPointSerializer.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBPointSerializer.java
@@ -18,15 +18,16 @@
package org.apache.flink.streaming.connectors.influxdb.sink.writer;
import com.influxdb.client.write.Point;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
@Internal
public final class InfluxDBPointSerializer implements SimpleVersionedSerializer<Point> {
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
index dff4a65..3871590 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
@@ -18,9 +18,10 @@
package org.apache.flink.streaming.connectors.influxdb.sink.writer;
import com.influxdb.client.write.Point;
+import org.apache.flink.api.connector.sink.SinkWriter.Context;
+
import java.io.IOException;
import java.io.Serializable;
-import org.apache.flink.api.connector.sink.SinkWriter.Context;
public interface InfluxDBSchemaSerializer<IN> extends Serializable {
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
index 5826c5c..bf3778b 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
@@ -17,17 +17,9 @@
*/
package org.apache.flink.streaming.connectors.influxdb.sink.writer;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
-
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
import com.influxdb.client.write.Point;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
import org.apache.flink.api.connector.sink.SinkWriter;
@@ -35,6 +27,13 @@ import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.*;
+
/**
* This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
* InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
@@ -78,7 +77,6 @@ public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
* @param context current Flink context
* @see org.apache.flink.api.connector.sink.SinkWriter.Context
*/
- @Override
public void write(final IN in, final Context context) throws IOException {
LOG.trace("Adding elements to buffer. Buffer size: {}", this.elements.size());
this.elements.add(this.schemaSerializer.serialize(in, context));
@@ -87,6 +85,7 @@ public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
this.writeCurrentElements();
this.elements.clear();
}
+
if (context.timestamp() != null) {
this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSink.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSink.java
new file mode 100644
index 0000000..9a46d05
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSink.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink2;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.influxdb.sink2.writer.InfluxDBSchemaSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink2.writer.InfluxDBWriter;
+
+import java.io.IOException;
+
+public class InfluxDBSink<IN> implements Sink<IN> {
+
+ private final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+ private final Configuration configuration;
+
+ InfluxDBSink(
+ final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer,
+ final Configuration configuration) {
+ this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+ this.configuration = configuration;
+ }
+
+ /**
+ * Get a influxDBSinkBuilder to build a {@link InfluxDBSink}.
+ *
+ * @return a InfluxDB sink builder.
+ */
+ public static <IN> InfluxDBSinkBuilder<IN> builder() {
+ return new InfluxDBSinkBuilder<>();
+ }
+
+
+ @Override
+ public SinkWriter<IN> createWriter(InitContext initContext) throws IOException {
+ final InfluxDBWriter<IN> writer =
+ new InfluxDBWriter<>(this.influxDBSchemaSerializer, this.configuration);
+ writer.setProcessingTimerService(initContext.getProcessingTimeService());
+ return writer;
+ }
+
+ public Configuration getConfiguration() {
+ return this.configuration;
+ }
+
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkBuilder.java
similarity index 87%
copy from flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
copy to flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkBuilder.java
index 6b9fb5e..e2e97f6 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkBuilder.java
@@ -15,26 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.influxdb.sink;
-
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_TOKEN;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+package org.apache.flink.streaming.connectors.influxdb.sink2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
-import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter;
+import org.apache.flink.streaming.connectors.influxdb.sink2.writer.InfluxDBSchemaSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink2.writer.InfluxDBWriter;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSinkOptions.*;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * The @builder class for {@link InfluxDBSink} to make it easier for the users to construct a {@link
- * InfluxDBSink}.
+ * The @builder class for {@link org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSink} to make it easier for the users to construct a {@link
+ * org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSink}.
*
* <p>The following example shows the minimum setup to create a InfluxDBSink that uses the Long
* values from a former operator and sends it to an InfluxDB instance.
@@ -191,7 +184,7 @@ public final class InfluxDBSinkBuilder<IN> {
}
/**
- * Build the {@link InfluxDBSink}.
+ * Build the {@link org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSink}.
*
* @return a InfluxDBSink with the settings made for this builder.
*/
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkOptions.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkOptions.java
new file mode 100644
index 0000000..7508434
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkOptions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink2;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.InfluxDBClientFactory;
+import com.influxdb.client.InfluxDBClientOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+
+public final class InfluxDBSinkOptions {
+
+ private InfluxDBSinkOptions() {}
+
+ public static final ConfigOption<Boolean> WRITE_DATA_POINT_CHECKPOINT =
+ ConfigOptions.key("sink.influxDB.write.data_point.checkpoint")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Determines if the checkpoint data point should be written to InfluxDB or not.");
+
+ public static final ConfigOption<Integer> WRITE_BUFFER_SIZE =
+ ConfigOptions.key("sink.influxDB.write.buffer.size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription(
+ "Size of the buffer to store the data before writing to InfluxDB.");
+
+ public static final ConfigOption<String> INFLUXDB_URL =
+ ConfigOptions.key("sink.influxDB.client.URL")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("InfluxDB Connection URL.");
+
+ public static final ConfigOption<String> INFLUXDB_USERNAME =
+ ConfigOptions.key("sink.influxDB.client.username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("InfluxDB username.");
+
+ public static final ConfigOption<String> INFLUXDB_PASSWORD =
+ ConfigOptions.key("sink.influxDB.client.password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("InfluxDB password.");
+
+ public static final ConfigOption<String> INFLUXDB_TOKEN =
+ ConfigOptions.key("sink.influxDB.client.token")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("InfluxDB access token.");
+
+ public static final ConfigOption<String> INFLUXDB_BUCKET =
+ ConfigOptions.key("sink.influxDB.client.bucket")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("InfluxDB bucket name.");
+
+ public static final ConfigOption<String> INFLUXDB_ORGANIZATION =
+ ConfigOptions.key("sink.influxDB.client.organization")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("InfluxDB organization name.");
+
+ public static InfluxDBClient getInfluxDBClient(final Configuration configuration) {
+ final String url = configuration.getString(INFLUXDB_URL);
+ final String username = configuration.getString(INFLUXDB_USERNAME);
+ final String password = configuration.getString(INFLUXDB_PASSWORD);
+ final String token = configuration.getString(INFLUXDB_TOKEN);
+ final String bucket = configuration.getString(INFLUXDB_BUCKET);
+ final String organization = configuration.getString(INFLUXDB_ORGANIZATION);
+ InfluxDBClientOptions.Builder builder = InfluxDBClientOptions.builder();
+ builder = builder
+ .url(url)
+ .bucket(bucket)
+ .org(organization);
+ if (token != null) {
+ builder = builder.authenticateToken(token.toCharArray());
+ } else if (username != null && password != null) {
+ builder = builder.authenticate(username, password.toCharArray());
+ }
+ final InfluxDBClientOptions influxDBClientOptions = builder.build();
+ return InfluxDBClientFactory.create(influxDBClientOptions);
+ }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBSchemaSerializer.java
similarity index 90%
copy from flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
copy to flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBSchemaSerializer.java
index dff4a65..d8f1a69 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBSchemaSerializer.java
@@ -15,12 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+package org.apache.flink.streaming.connectors.influxdb.sink2.writer;
import com.influxdb.client.write.Point;
+import org.apache.flink.api.connector.sink2.SinkWriter.Context;
+
import java.io.IOException;
import java.io.Serializable;
-import org.apache.flink.api.connector.sink.SinkWriter.Context;
public interface InfluxDBSchemaSerializer<IN> extends Serializable {
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBWriter.java
similarity index 63%
copy from flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
copy to flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBWriter.java
index 5826c5c..10797ec 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBWriter.java
@@ -15,39 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.influxdb.sink.writer;
-
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+package org.apache.flink.streaming.connectors.influxdb.sink2.writer;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
+import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
-import org.apache.flink.api.connector.sink.SinkWriter;
-import org.apache.flink.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Optional;
-/**
- * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
- * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
- * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
- * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
- * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
- * elements.
- *
- * @param <IN> Type of the input
- * @see WriteApi
- */
-@Internal
-public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+import static org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSinkOptions.*;
+
+public class InfluxDBWriter<IN> implements SinkWriter<IN> {
private static final Logger LOG = LoggerFactory.getLogger(InfluxDBWriter.class);
@@ -69,6 +57,10 @@ public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
this.influxDBClient = getInfluxDBClient(configuration);
}
+ public void setProcessingTimerService(final ProcessingTimeService processingTimerService) {
+ this.processingTimerService = processingTimerService;
+ }
+
/**
* This method calls the InfluxDB write API whenever the element list reaches the {@link
* #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
@@ -76,60 +68,72 @@ public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
*
* @param in incoming data
* @param context current Flink context
- * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+ * @see org.apache.flink.api.connector.sink2.SinkWriter.Context
*/
@Override
- public void write(final IN in, final Context context) throws IOException {
+ public void write(IN in, Context context) throws IOException, InterruptedException {
LOG.trace("Adding elements to buffer. Buffer size: {}", this.elements.size());
this.elements.add(this.schemaSerializer.serialize(in, context));
+
if (this.elements.size() == this.bufferSize) {
LOG.debug("Buffer size reached preparing to write the elements.");
this.writeCurrentElements();
- this.elements.clear();
}
if (context.timestamp() != null) {
this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
}
+
}
- /**
- * This method is called whenever a checkpoint is set by Flink. It creates a list and fills it
- * up with the latest timestamp.
- *
- * @param flush
- * @return A list containing 0 or 1 element
- */
@Override
- public List<Long> prepareCommit(final boolean flush) {
- if (this.lastTimestamp == 0) {
- return Collections.emptyList();
+ public void flush(boolean flush) throws IOException, InterruptedException {
+ if (this.lastTimestamp == 0) return;
+
+ commit(Collections.singletonList(this.lastTimestamp));
+ }
+
+ public void commit(final List<Long> committables) {
+ if (this.writeCheckpoint) {
+ LOG.debug("A checkpoint is set.");
+ Optional<Long> lastTimestamp = Optional.empty();
+ if (committables.size() >= 1) {
+ lastTimestamp = Optional.ofNullable(committables.get(committables.size() - 1));
+ }
+ lastTimestamp.ifPresent(this::writeCheckpointDataPoint);
}
- final List<Long> lastTimestamp = new ArrayList<>(1);
- lastTimestamp.add(this.lastTimestamp);
- return lastTimestamp;
}
- @Override
- public List<Point> snapshotState() {
- return this.elements;
+ private void writeCheckpointDataPoint(final Long timestamp) {
+ final Point point = new Point("checkpoint")
+ .addField("checkpoint", "flink")
+ .time(timestamp, WritePrecision.MS);
+
+ writeElementsOf(Collections.singletonList(point));
+ LOG.debug("Checkpoint data point write at {}", point.toLineProtocol());
}
@Override
public void close() throws Exception {
LOG.debug("Preparing to write the elements in InfluxDB.");
this.writeCurrentElements();
+
LOG.debug("Closing the writer.");
- this.elements.clear();
+ this.influxDBClient.close();
}
- public void setProcessingTimerService(final ProcessingTimeService processingTimerService) {
- this.processingTimerService = processingTimerService;
+ private void writeCurrentElements() {
+ writeElementsOf(this.elements);
+ this.elements.clear();
}
- private void writeCurrentElements() {
+ private void writeElementsOf(List<Point> toWrite) {
+ if (toWrite.isEmpty()) return;
+
try (final WriteApi writeApi = this.influxDBClient.getWriteApi()) {
- writeApi.writePoints(this.elements);
- LOG.debug("Wrote {} data points", this.elements.size());
+ writeApi.writePoints(toWrite);
+ LOG.debug("Wrote {} data points", toWrite.size());
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java
index fb55eef..7c17035 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java
@@ -17,14 +17,8 @@
*/
package org.apache.flink.streaming.connectors.influxdb.source;
-import java.util.function.Supplier;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.api.connector.source.SourceReader;
-import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.connector.source.SplitEnumerator;
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.*;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -38,6 +32,8 @@ import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer
import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplitSerializer;
+import java.util.function.Supplier;
+
/**
* The Source implementation of InfluxDB. Please use a {@link InfluxDBSourceBuilder} to construct a
* {@link InfluxDBSource}. The following example shows how to create an InfluxDBSource emitting
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java
index 4ff9c5c..9b05307 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java
@@ -17,11 +17,11 @@
*/
package org.apache.flink.streaming.connectors.influxdb.source;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* The @builder class for {@link InfluxDBSource} to make it easier for the users to construct a
* {@link InfluxDBSource}.
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
index 9ac9b23..54373c9 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
@@ -17,15 +17,16 @@
*/
package org.apache.flink.streaming.connectors.influxdb.source.enumerator;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
import org.jetbrains.annotations.Nullable;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/** The enumerator class for InfluxDB source. */
@Internal
public final class InfluxDBSplitEnumerator
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java
index e71b684..12d1431 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java
@@ -19,11 +19,12 @@ package org.apache.flink.streaming.connectors.influxdb.source.http;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
-import java.io.IOException;
-import java.io.OutputStream;
import org.apache.flink.annotation.Internal;
import org.jetbrains.annotations.NotNull;
+import java.io.IOException;
+import java.io.OutputStream;
+
/** Abstract base handle class for creating a response */
@Internal
abstract class Handler implements HttpHandler {
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java
index ec4d45d..7daee00 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java
@@ -18,9 +18,10 @@
package org.apache.flink.streaming.connectors.influxdb.source.http;
import com.sun.net.httpserver.HttpExchange;
+import org.apache.flink.annotation.Internal;
+
import java.io.IOException;
import java.net.HttpURLConnection;
-import org.apache.flink.annotation.Internal;
/**
* Handles incoming health check requests from /health path. If the server is running a response
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
index 1c2bba0..c9752ed 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
@@ -18,6 +18,13 @@
package org.apache.flink.streaming.connectors.influxdb.source.http;
import com.sun.net.httpserver.HttpExchange;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -30,12 +37,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
-import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This class handles the incoming requests through the path /api/v2/write. The handle function
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java
index 1612bc5..08e98d5 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java
@@ -17,7 +17,6 @@
*/
package org.apache.flink.streaming.connectors.influxdb.source.reader;
-import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
@@ -25,6 +24,8 @@ import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+import java.io.IOException;
+
@Internal
public final class InfluxDBRecordEmitter<T> implements RecordEmitter<DataPoint, T, InfluxDBSplit> {
private final InfluxDBDataPointDeserializer<T> dataPointDeserializer;
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSourceReader.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSourceReader.java
index 1b75b84..d1bf098 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSourceReader.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSourceReader.java
@@ -17,8 +17,6 @@
*/
package org.apache.flink.streaming.connectors.influxdb.source.reader;
-import java.util.Map;
-import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
@@ -27,6 +25,9 @@ import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource
import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+import java.util.Map;
+import java.util.function.Supplier;
+
/** The source reader for the InfluxDB line protocol. */
@Internal
public final class InfluxDBSourceReader<OUT>
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java
index ddfcb33..7b19d6c 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java
@@ -17,21 +17,7 @@
*/
package org.apache.flink.streaming.connectors.influxdb.source.reader;
-import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.ENQUEUE_WAIT_TIME;
-import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.INGEST_QUEUE_CAPACITY;
-import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.MAXIMUM_LINES_PER_REQUEST;
-import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.PORT;
-
import com.sun.net.httpserver.HttpServer;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -43,6 +29,14 @@ import org.apache.flink.streaming.connectors.influxdb.source.http.HealthCheckHan
import org.apache.flink.streaming.connectors.influxdb.source.http.WriteAPIHandler;
import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.*;
+
/**
* A {@link SplitReader} implementation that reads records from InfluxDB splits.
*
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java
index 0773b2a..9dc9658 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java
@@ -17,13 +17,14 @@
*/
package org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer;
-import java.io.IOException;
-import java.io.Serializable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import java.io.IOException;
+import java.io.Serializable;
+
/** An interface for the deserialization of InfluxDB data points. */
public interface InfluxDBDataPointDeserializer<OUT> extends Serializable, ResultTypeQueryable<OUT> {
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java
index 4d1da83..624a27f 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java
@@ -17,10 +17,11 @@
*/
package org.apache.flink.streaming.connectors.influxdb.source.split;
-import java.nio.ByteBuffer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import java.nio.ByteBuffer;
+
/**
* The {@link org.apache.flink.core.io.SimpleVersionedSerializer serializer} for {@link
* InfluxDBSplit}.
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParserTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParserTest.java
index 71d9529..172d77f 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParserTest.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParserTest.java
@@ -17,16 +17,14 @@
*/
package org.apache.flink.streaming.connectors.influxdb.common;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.text.ParseException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.text.ParseException;
+
+import static org.junit.jupiter.api.Assertions.*;
+
class InfluxParserTest {
@Test
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
index f8dc31c..1138aa4 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
@@ -17,12 +17,13 @@
*/
package org.apache.flink.streaming.connectors.influxdb.sink;
-import static org.junit.jupiter.api.Assertions.*;
-
-import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBContainer;
-import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBTestSerializer;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
class InfluxDBSinkBuilderTest {
@Test
@@ -33,10 +34,10 @@ class InfluxDBSinkBuilderTest {
() ->
InfluxDBSink.builder()
.setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
- .setInfluxDBUsername(InfluxDBContainer.username)
- .setInfluxDBPassword(InfluxDBContainer.password)
- .setInfluxDBBucket(InfluxDBContainer.bucket)
- .setInfluxDBOrganization(InfluxDBContainer.organization)
+ .setInfluxDBUsername("username")
+ .setInfluxDBPassword("password")
+ .setInfluxDBBucket("bucket")
+ .setInfluxDBOrganization("organization")
.build());
assertEquals(exception.getMessage(), "The InfluxDB URL is required but not provided.");
}
@@ -49,9 +50,9 @@ class InfluxDBSinkBuilderTest {
() ->
InfluxDBSink.builder()
.setInfluxDBUrl("http://localhost:8086")
- .setInfluxDBPassword(InfluxDBContainer.password)
- .setInfluxDBBucket(InfluxDBContainer.bucket)
- .setInfluxDBOrganization(InfluxDBContainer.organization)
+ .setInfluxDBPassword("password")
+ .setInfluxDBBucket("bucket")
+ .setInfluxDBOrganization("organization")
.setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
.build());
assertEquals(exception.getMessage(),
@@ -66,9 +67,9 @@ class InfluxDBSinkBuilderTest {
() ->
InfluxDBSink.builder()
.setInfluxDBUrl("http://localhost:8086")
- .setInfluxDBUsername(InfluxDBContainer.username)
- .setInfluxDBBucket(InfluxDBContainer.bucket)
- .setInfluxDBOrganization(InfluxDBContainer.organization)
+ .setInfluxDBUsername("username")
+ .setInfluxDBBucket("bucket")
+ .setInfluxDBOrganization("organization")
.setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
.build());
assertEquals(exception.getMessage(),
@@ -83,11 +84,11 @@ class InfluxDBSinkBuilderTest {
() ->
InfluxDBSink.builder()
.setInfluxDBUrl("http://localhost:8086")
- .setInfluxDBToken(InfluxDBContainer.token)
- .setInfluxDBUsername(InfluxDBContainer.username)
- .setInfluxDBPassword(InfluxDBContainer.password)
- .setInfluxDBBucket(InfluxDBContainer.bucket)
- .setInfluxDBOrganization(InfluxDBContainer.organization)
+ .setInfluxDBToken("token")
+ .setInfluxDBUsername("username")
+ .setInfluxDBPassword("password")
+ .setInfluxDBBucket("bucket")
+ .setInfluxDBOrganization("organization")
.setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
.build());
assertEquals(exception.getMessage(),
@@ -102,9 +103,9 @@ class InfluxDBSinkBuilderTest {
() ->
InfluxDBSink.builder()
.setInfluxDBUrl("http://localhost:8086")
- .setInfluxDBUsername(InfluxDBContainer.username)
- .setInfluxDBPassword(InfluxDBContainer.password)
- .setInfluxDBOrganization(InfluxDBContainer.organization)
+ .setInfluxDBUsername("username")
+ .setInfluxDBPassword("password")
+ .setInfluxDBOrganization("organization")
.setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
.build());
assertEquals(exception.getMessage(), "The Bucket name is required but not provided.");
@@ -118,9 +119,9 @@ class InfluxDBSinkBuilderTest {
() ->
InfluxDBSink.builder()
.setInfluxDBUrl("http://localhost:8086")
- .setInfluxDBUsername(InfluxDBContainer.username)
- .setInfluxDBPassword(InfluxDBContainer.password)
- .setInfluxDBBucket(InfluxDBContainer.bucket)
+ .setInfluxDBUsername("username")
+ .setInfluxDBPassword("password")
+ .setInfluxDBBucket("bucket")
.setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
.build());
assertEquals(exception.getMessage(), "The Organization name is required but not provided.");
@@ -134,10 +135,10 @@ class InfluxDBSinkBuilderTest {
() ->
InfluxDBSink.builder()
.setInfluxDBUrl("http://localhost:8086")
- .setInfluxDBUsername(InfluxDBContainer.username)
- .setInfluxDBPassword(InfluxDBContainer.password)
- .setInfluxDBBucket(InfluxDBContainer.bucket)
- .setInfluxDBOrganization(InfluxDBContainer.organization)
+ .setInfluxDBUsername("username")
+ .setInfluxDBPassword("password")
+ .setInfluxDBBucket("bucket")
+ .setInfluxDBOrganization("organization")
.build());
assertEquals(exception.getMessage(), "Serialization schema is required but not provided.");
}
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkIntegrationTestCase.java
similarity index 85%
copy from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
copy to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkIntegrationTestCase.java
index 138b017..76b4751 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkIntegrationTestCase.java
@@ -15,41 +15,40 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.influxdb;
-
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+package org.apache.flink.streaming.connectors.influxdb.sink;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSink;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBTestSerializer;
import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBContainer;
-import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestSerializer;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
@Testcontainers
class InfluxDBSinkIntegrationTestCase extends TestLogger {
@Container
- public static final InfluxDBContainer<?> influxDBContainer =
+ public static final InfluxDBContainer influxDBContainer =
InfluxDBContainer.createWithDefaultTag();
private static final List<Long> SOURCE_DATA = Arrays.asList(1L, 2L, 3L);
@@ -81,15 +80,17 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
InfluxDBSink.builder()
.setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
.setInfluxDBUrl(influxDBContainer.getUrl())
- .setInfluxDBUsername(InfluxDBContainer.username)
- .setInfluxDBPassword(InfluxDBContainer.password)
- .setInfluxDBBucket(InfluxDBContainer.bucket)
- .setInfluxDBOrganization(InfluxDBContainer.organization)
+ .setInfluxDBUsername(influxDBContainer.getUsername())
+ .setInfluxDBPassword(influxDBContainer.getPassword())
+ .setInfluxDBBucket(influxDBContainer.getBucket())
+ .setInfluxDBOrganization(influxDBContainer.getOrganization())
.setWriteBufferSize(2)
.addCheckpointDataPoint(true)
.build();
- env.addSource(new FiniteTestSource(SOURCE_DATA), BasicTypeInfo.LONG_TYPE_INFO)
+ env.addSource(new FiniteTestSource<>(SOURCE_DATA), BasicTypeInfo.LONG_TYPE_INFO)
+ .assignTimestampsAndWatermarks(WatermarkStrategy.<Long>noWatermarks()
+ .withTimestampAssigner((event, timestamp) -> System.currentTimeMillis()))
.sinkTo(influxDBSink);
env.execute();
@@ -100,7 +101,9 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
assertEquals(actualWrittenPoints.size(), EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.size());
final List<String> actualCheckpoints = queryCheckpoints(client);
- assertTrue(actualCheckpoints.size() >= 4);
+
+ // Checkpoints are called 4 (or more) times. but as no watermark changes this should be 2 effective points
+ assertEquals(2, actualCheckpoints.size());
}
// ---------------- private helper methods --------------------
@@ -121,7 +124,7 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
"from(bucket: \"%s\") |> "
+ "range(start: -1h) |> "
+ "filter(fn:(r) => r._measurement == \"test\")",
- InfluxDBContainer.bucket);
+ influxDBContainer.getBucket());
final List<FluxTable> tables = influxDBClient.getQueryApi().query(query);
for (final FluxTable table : tables) {
for (final FluxRecord record : table.getRecords()) {
@@ -139,7 +142,7 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
"from(bucket: \"%s\") |> "
+ "range(start: -1h) |> "
+ "filter(fn:(r) => r._measurement == \"checkpoint\")",
- InfluxDBContainer.bucket);
+ influxDBContainer.getBucket());
final List<FluxTable> tables = influxDBClient.getQueryApi().query(query);
for (final FluxTable table : tables) {
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBTestSerializer.java
similarity index 89%
copy from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java
copy to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBTestSerializer.java
index c48ed98..17ada08 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBTestSerializer.java
@@ -15,11 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.influxdb.util;
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
import com.influxdb.client.write.Point;
import org.apache.flink.api.connector.sink.SinkWriter.Context;
-import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
import org.jetbrains.annotations.Nullable;
public class InfluxDBTestSerializer implements InfluxDBSchemaSerializer<Long> {
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkIntegrationTestCase.java
similarity index 82%
rename from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
rename to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkIntegrationTestCase.java
index 138b017..7f35ae5 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkIntegrationTestCase.java
@@ -15,48 +15,45 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.influxdb;
-
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+package org.apache.flink.streaming.connectors.influxdb.sink2;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSink;
+import org.apache.flink.streaming.connectors.influxdb.sink2.writer.InfluxDBTestSerializer;
import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBContainer;
-import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestSerializer;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
@Testcontainers
class InfluxDBSinkIntegrationTestCase extends TestLogger {
@Container
- public static final InfluxDBContainer<?> influxDBContainer =
+ public static final InfluxDBContainer influxDBContainer =
InfluxDBContainer.createWithDefaultTag();
private static final List<Long> SOURCE_DATA = Arrays.asList(1L, 2L, 3L);
- // FiniteTestSource emits list of elements twice
private static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE =
- Stream.concat(SOURCE_DATA.stream(), SOURCE_DATA.stream())
+ SOURCE_DATA.stream()
.map(x -> new InfluxDBTestSerializer().serialize(x, null).toLineProtocol())
.collect(Collectors.toList());
@@ -81,15 +78,16 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
InfluxDBSink.builder()
.setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
.setInfluxDBUrl(influxDBContainer.getUrl())
- .setInfluxDBUsername(InfluxDBContainer.username)
- .setInfluxDBPassword(InfluxDBContainer.password)
- .setInfluxDBBucket(InfluxDBContainer.bucket)
- .setInfluxDBOrganization(InfluxDBContainer.organization)
- .setWriteBufferSize(2)
+ .setInfluxDBUsername(influxDBContainer.getUsername())
+ .setInfluxDBPassword(influxDBContainer.getPassword())
+ .setInfluxDBBucket(influxDBContainer.getBucket())
+ .setInfluxDBOrganization(influxDBContainer.getOrganization())
.addCheckpointDataPoint(true)
.build();
- env.addSource(new FiniteTestSource(SOURCE_DATA), BasicTypeInfo.LONG_TYPE_INFO)
+ env.addSource(new FiniteTestSource<>(SOURCE_DATA), BasicTypeInfo.LONG_TYPE_INFO)
+ .assignTimestampsAndWatermarks(WatermarkStrategy.<Long>noWatermarks()
+ .withTimestampAssigner((event, timestamp) -> System.currentTimeMillis()))
.sinkTo(influxDBSink);
env.execute();
@@ -97,10 +95,12 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
final InfluxDBClient client = getInfluxDBClient(influxDBSink.getConfiguration());
final List<String> actualWrittenPoints = queryWrittenData(client);
- assertEquals(actualWrittenPoints.size(), EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.size());
+ assertEquals(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.size(), actualWrittenPoints.size());
final List<String> actualCheckpoints = queryCheckpoints(client);
- assertTrue(actualCheckpoints.size() >= 4);
+
+ // Checkpoints are called 4 (or more) times. but as no watermark changes this should be 2 effective points
+ assertEquals(2, actualCheckpoints.size());
}
// ---------------- private helper methods --------------------
@@ -121,7 +121,7 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
"from(bucket: \"%s\") |> "
+ "range(start: -1h) |> "
+ "filter(fn:(r) => r._measurement == \"test\")",
- InfluxDBContainer.bucket);
+ influxDBContainer.getBucket());
final List<FluxTable> tables = influxDBClient.getQueryApi().query(query);
for (final FluxTable table : tables) {
for (final FluxRecord record : table.getRecords()) {
@@ -139,7 +139,7 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
"from(bucket: \"%s\") |> "
+ "range(start: -1h) |> "
+ "filter(fn:(r) => r._measurement == \"checkpoint\")",
- InfluxDBContainer.bucket);
+ influxDBContainer.getBucket());
final List<FluxTable> tables = influxDBClient.getQueryApi().query(query);
for (final FluxTable table : tables) {
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBTestSerializer.java
similarity index 85%
rename from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java
rename to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBTestSerializer.java
index c48ed98..c6a7b55 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBTestSerializer.java
@@ -15,11 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.influxdb.util;
+package org.apache.flink.streaming.connectors.influxdb.sink2.writer;
import com.influxdb.client.write.Point;
-import org.apache.flink.api.connector.sink.SinkWriter.Context;
-import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+import org.apache.flink.api.connector.sink2.SinkWriter.Context;
import org.jetbrains.annotations.Nullable;
public class InfluxDBTestSerializer implements InfluxDBSchemaSerializer<Long> {
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilderTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilderTest.java
index 4045b8c..71fa714 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilderTest.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilderTest.java
@@ -17,11 +17,11 @@
*/
package org.apache.flink.streaming.connectors.influxdb.source;
+import org.junit.jupiter.api.Test;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import org.junit.jupiter.api.Test;
-
class InfluxDBSourceBuilderTest {
@Test
void shouldNotBuildSourceWhenSchemaDeserializerIsNotProvided() {
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSourceIntegrationTestCase.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceIntegrationTestCase.java
similarity index 90%
rename from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSourceIntegrationTestCase.java
rename to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceIntegrationTestCase.java
index 8165533..1ce50e2 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSourceIntegrationTestCase.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceIntegrationTestCase.java
@@ -15,21 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.influxdb;
+package org.apache.flink.streaming.connectors.influxdb.source;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import com.google.api.client.http.ByteArrayContent;
-import com.google.api.client.http.GenericUrl;
-import com.google.api.client.http.HttpBackOffIOExceptionHandler;
-import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
-import com.google.api.client.http.HttpContent;
-import com.google.api.client.http.HttpRequest;
-import com.google.api.client.http.HttpRequestFactory;
-import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.*;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.util.ExponentialBackOff;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBTestDeserializer;
+import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.*;
+
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.ServerSocket;
@@ -37,19 +36,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSource;
-import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestDeserializer;
-import org.apache.flink.util.TestLogger;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInstance;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Integration test for the InfluxDB source for Flink. */
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestDeserializer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBTestDeserializer.java
similarity index 94%
rename from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestDeserializer.java
rename to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBTestDeserializer.java
index d231d89..11d4f8e 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestDeserializer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBTestDeserializer.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.influxdb.util;
+package org.apache.flink.streaming.connectors.influxdb.source.reader;
import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
index b8a0d69..6c7bcb9 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
@@ -17,93 +17,25 @@
*/
package org.apache.flink.streaming.connectors.influxdb.util;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.MountableFile;
-public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>
- extends GenericContainer<SELF> {
+public final class InfluxDBContainer extends InfluxDBContainerCustom {
private static final Logger LOG = LoggerFactory.getLogger(InfluxDBContainer.class);
- public static final Integer INFLUXDB_PORT = 8086;
-
- private static final String REGISTRY = "quay.io";
- private static final String REPOSITORY = "influxdb/influxdb";
- private static final String TAG = "v2.0.2";
private static final DockerImageName DEFAULT_IMAGE_NAME =
- DockerImageName.parse(String.format("%s/%s:%s", REGISTRY, REPOSITORY, TAG));
- private static final int NO_CONTENT_STATUS_CODE = 204;
- private static final String INFLUX_SETUP_SH = "influx-setup.sh";
-
- public static final String username = "test-user";
- public static final String password = "test-password";
- public static final String token = "access-token";
- public static final String bucket = "test-bucket";
- public static final String organization = "test-org";
- private static final int retention = 0;
- private static final String retentionUnit = "ns";
+ DockerImageName.parse("quay.io/influxdb/influxdb:v2.0.2");
private InfluxDBContainer(final DockerImageName imageName) {
super(imageName);
- imageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
- this.setEnv();
- this.waitStrategy =
- (new WaitAllStrategy())
- .withStrategy(
- Wait.forHttp("/ping")
- .withBasicCredentials(username, password)
- .forStatusCode(NO_CONTENT_STATUS_CODE))
- .withStrategy(Wait.forListeningPort());
- this.addExposedPort(INFLUXDB_PORT);
- this.startContainer();
}
- public static InfluxDBContainer<?> createWithDefaultTag() {
+ public static InfluxDBContainer createWithDefaultTag() {
LOG.info("Starting influxDB test container with default tag {}", DEFAULT_IMAGE_NAME);
- return new InfluxDBContainer<>(DEFAULT_IMAGE_NAME);
- }
-
- private void setEnv() {
- this.addEnv("INFLUXDB_USER", username);
- this.addEnv("INFLUXDB_PASSWORD", password);
- this.addEnv("INFLUXDB_BUCKET", bucket);
- this.addEnv("INFLUXDB_ORG", organization);
- this.addEnv("INFLUXDB_RETENTION", String.valueOf(retention));
- this.addEnv("INFLUXDB_RETENTION_UNIT", retentionUnit);
+ return new InfluxDBContainer(DEFAULT_IMAGE_NAME);
}
- private void startContainer() {
- this.withCopyFileToContainer(
- MountableFile.forClasspathResource(INFLUX_SETUP_SH),
- String.format("%s", INFLUX_SETUP_SH));
- this.start();
- this.setUpInfluxDB();
- LOG.info("Started InfluxDB container on: {}", this.getUrl());
- }
-
- private void setUpInfluxDB() {
- final ExecResult execResult;
- try {
- execResult = this.execInContainer("chmod", "-x", String.format("/%s", INFLUX_SETUP_SH));
- assertEquals(execResult.getExitCode(), 0);
- final ExecResult writeResult =
- this.execInContainer("/bin/bash", String.format("/%s", INFLUX_SETUP_SH));
- assertEquals(writeResult.getExitCode(), 0);
- } catch (final InterruptedException | IOException e) {
- LOG.error("An error occurred while setting up InfluxDB {}", e.getMessage());
- }
- }
-
- public String getUrl() {
- return "http://" + this.getHost() + ":" + this.getMappedPort(INFLUXDB_PORT);
- }
}
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainerCustom.java
similarity index 75%
copy from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
copy to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainerCustom.java
index b8a0d69..8950079 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainerCustom.java
@@ -17,9 +17,6 @@
*/
package org.apache.flink.streaming.connectors.influxdb.util;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
@@ -28,30 +25,29 @@ import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;
-public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>
- extends GenericContainer<SELF> {
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class InfluxDBContainerCustom
+ extends GenericContainer<InfluxDBContainerCustom> {
- private static final Logger LOG = LoggerFactory.getLogger(InfluxDBContainer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(InfluxDBContainerCustom.class);
public static final Integer INFLUXDB_PORT = 8086;
- private static final String REGISTRY = "quay.io";
- private static final String REPOSITORY = "influxdb/influxdb";
private static final String TAG = "v2.0.2";
private static final DockerImageName DEFAULT_IMAGE_NAME =
- DockerImageName.parse(String.format("%s/%s:%s", REGISTRY, REPOSITORY, TAG));
+ DockerImageName.parse("quay.io/influxdb/influxdb:v2.0.2");
private static final int NO_CONTENT_STATUS_CODE = 204;
private static final String INFLUX_SETUP_SH = "influx-setup.sh";
- public static final String username = "test-user";
- public static final String password = "test-password";
- public static final String token = "access-token";
- public static final String bucket = "test-bucket";
- public static final String organization = "test-org";
- private static final int retention = 0;
- private static final String retentionUnit = "ns";
+ private static final String username = "test-user";
+ private static final String password = "test-password";
+ private static final String bucket = "test-bucket";
+ private static final String organization = "test-org";
- private InfluxDBContainer(final DockerImageName imageName) {
+ public InfluxDBContainerCustom(final DockerImageName imageName) {
super(imageName);
imageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
this.setEnv();
@@ -67,9 +63,20 @@ public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>
this.startContainer();
}
- public static InfluxDBContainer<?> createWithDefaultTag() {
- LOG.info("Starting influxDB test container with default tag {}", DEFAULT_IMAGE_NAME);
- return new InfluxDBContainer<>(DEFAULT_IMAGE_NAME);
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public String getOrganization() {
+ return organization;
}
private void setEnv() {
@@ -77,8 +84,6 @@ public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>
this.addEnv("INFLUXDB_PASSWORD", password);
this.addEnv("INFLUXDB_BUCKET", bucket);
this.addEnv("INFLUXDB_ORG", organization);
- this.addEnv("INFLUXDB_RETENTION", String.valueOf(retention));
- this.addEnv("INFLUXDB_RETENTION_UNIT", retentionUnit);
}
private void startContainer() {
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 20b16b4..134d6f7 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -36,13 +36,6 @@
<dependencyManagement>
<dependencies>
- <dependency>
- <groupId>org.apache.kudu</groupId>
- <artifactId>kudu-binary</artifactId>
- <version>${kudu.version}</version>
- <classifier>${os.detected.classifier}</classifier>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
@@ -85,11 +78,11 @@
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <artifactId>flink-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -99,11 +92,6 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.kudu</groupId>
- <artifactId>kudu-binary</artifactId>
- <classifier>${os.detected.classifier}</classifier>
- </dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
index b178308..4997938 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
@@ -18,7 +18,6 @@
package org.apache.flink.connectors.kudu.connector;
import org.apache.flink.annotation.PublicEvolving;
-
import org.apache.kudu.ColumnSchema;
import java.io.Serializable;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
index 4a475e9..fd9bfa4 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
@@ -18,7 +18,6 @@
package org.apache.flink.connectors.kudu.connector;
import org.apache.flink.annotation.PublicEvolving;
-
import org.apache.kudu.client.CreateTableOptions;
import java.io.Serializable;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index e7a8d16..94e2e26 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -18,7 +18,6 @@ package org.apache.flink.connectors.kudu.connector;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.data.binary.BinaryStringData;
-
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduPredicate;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
index baae8a0..655a914 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
@@ -16,9 +16,8 @@
*/
package org.apache.flink.connectors.kudu.connector;
-import org.apache.flink.annotation.PublicEvolving;
-
import org.apache.commons.lang3.Validate;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.kudu.Schema;
import org.apache.kudu.client.CreateTableOptions;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java
index b7dc702..5196d7f 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java
@@ -17,11 +17,7 @@
package org.apache.flink.connectors.kudu.connector.convertor;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.*;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.RowResult;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
index 3c8954f..c67c6ed 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
@@ -17,7 +17,6 @@
package org.apache.flink.connectors.kudu.connector.failure;
import org.apache.flink.annotation.PublicEvolving;
-
import org.apache.kudu.client.RowError;
import java.io.IOException;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
index 6816fc3..72b734c 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
@@ -16,18 +16,12 @@
*/
package org.apache.flink.connectors.kudu.connector.reader;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-
-import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduScanToken;
-import org.apache.kudu.client.KuduSession;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
index 468cb1e..4727488 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
@@ -16,9 +16,8 @@
*/
package org.apache.flink.connectors.kudu.connector.reader;
-import org.apache.flink.annotation.PublicEvolving;
-
import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.flink.annotation.PublicEvolving;
import java.io.Serializable;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
index d9f8219..794d56b 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
@@ -17,7 +17,6 @@
package org.apache.flink.connectors.kudu.connector.writer;
import org.apache.flink.annotation.PublicEvolving;
-
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.PartialRow;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
index 4878ab3..886a3ea 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
@@ -17,7 +17,6 @@
package org.apache.flink.connectors.kudu.connector.writer;
import org.apache.flink.annotation.PublicEvolving;
-
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index 59ad196..2171a43 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -21,14 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
-
-import org.apache.kudu.client.DeleteTableResponse;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduSession;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.OperationResponse;
-import org.apache.kudu.client.RowError;
+import org.apache.kudu.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
index 6c6d216..9b63494 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
@@ -16,9 +16,8 @@
*/
package org.apache.flink.connectors.kudu.connector.writer;
-import org.apache.flink.annotation.PublicEvolving;
-
import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.kudu.client.AsyncKuduClient;
import java.io.Serializable;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
index db44eec..253146c 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
@@ -19,11 +19,7 @@ package org.apache.flink.connectors.kudu.connector.writer;
import org.apache.flink.annotation.PublicEvolving;
import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
@PublicEvolving
public class PojoOperationMapper<T> extends AbstractSingleOperationMapper<T> {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
index 900515d..bc1b031 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
@@ -28,7 +28,6 @@ import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
index a671408..a2e3e47 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
index 2e1c63e..ffe02a8 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
@@ -19,26 +19,8 @@
package org.apache.flink.connectors.kudu.table;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.AbstractCatalog;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
-import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.exceptions.*;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
index 734e219..20ec341 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
@@ -21,29 +21,15 @@ package org.apache.flink.connectors.kudu.table;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
-import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.exceptions.*;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.util.StringUtils;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.KuduClient;
@@ -53,19 +39,10 @@ import org.apache.kudu.shaded.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
import java.util.stream.Collectors;
-import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_HASH_COLS;
-import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_PRIMARY_KEY_COLS;
-import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_REPLICAS;
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.*;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -92,7 +69,7 @@ public class KuduCatalog extends AbstractReadOnlyCatalog {
* @param kuduMasters Connection address to Kudu
*/
public KuduCatalog(String catalogName, String kuduMasters) {
- super(catalogName, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE);
+ super(catalogName, "default_database");
this.kuduMasters = kuduMasters;
this.kuduClient = createClient();
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index 9112b0a..c46ac85 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -30,32 +30,12 @@ import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.types.Row;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.*;
+import static org.apache.flink.table.descriptors.Rowtime.*;
+import static org.apache.flink.table.descriptors.Schema.*;
import static org.apache.flink.util.Preconditions.checkNotNull;
public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Tuple2<Boolean, Row>> {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
index ad98e86..74daa89 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
@@ -29,11 +29,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.sources.FilterableTableSource;
-import org.apache.flink.table.sources.LimitableTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.*;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
@@ -42,12 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Optional;
+import java.util.*;
import static org.apache.flink.connectors.kudu.table.utils.KuduTableUtils.toKuduFilterInfo;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
index 847dad4..31c8d79 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
@@ -20,7 +20,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
import org.apache.flink.types.Row;
-
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
index 2022cd7..cde6a13 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
@@ -27,26 +27,24 @@ import org.apache.flink.connectors.kudu.table.function.lookup.KuduRowDataLookupF
import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.InputFormatProvider;
-import org.apache.flink.table.connector.source.LookupTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.*;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
import org.apache.kudu.shaded.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
+import java.util.*;
+
+import static org.apache.flink.calcite.shaded.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.flink.table.utils.TableSchemaUtils.containsPhysicalColumnsOnly;
/**
* A {@link DynamicTableSource} for Kudu.
@@ -138,12 +136,28 @@ public class KuduDynamicTableSource implements ScanTableSource, SupportsProjecti
}
@Override
- public void applyProjection(int[][] projectedFields) {
+ public void applyProjection(int[][] projectedFields, DataType producedDataType) {
// parser projectFields
- this.physicalSchema = TableSchemaUtils.projectSchema(this.physicalSchema, projectedFields);
+ this.physicalSchema = projectSchema(this.physicalSchema, projectedFields);
this.projectedFields = physicalSchema.getFieldNames();
}
+ private TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) {
+ checkArgument(
+ containsPhysicalColumnsOnly(tableSchema),
+ "Projection is only supported for physical columns.");
+ TableSchema.Builder builder = TableSchema.builder();
+
+ FieldsDataType fields =
+ (FieldsDataType)
+ DataTypeUtils.projectRow(tableSchema.toRowDataType(), projectedFields);
+ RowType topFields = (RowType) fields.getLogicalType();
+ for (int i = 0; i < topFields.getFieldCount(); i++) {
+ builder.field(topFields.getFieldNames().get(i), fields.getChildren().get(i));
+ }
+ return builder.build();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
index b531835..3dff23c 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
@@ -22,21 +22,9 @@ import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.table.AbstractReadOnlyCatalog;
import org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory;
import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
-import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.exceptions.*;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
@@ -52,20 +40,10 @@ import org.apache.kudu.shaded.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
import java.util.stream.Collectors;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_COLS;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_PARTITION_NUMS;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_PRIMARY_KEY_COLS;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_REPLICAS;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.*;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -86,7 +64,7 @@ public class KuduDynamicCatalog extends AbstractReadOnlyCatalog {
* @param kuduMasters Connection address to Kudu
*/
public KuduDynamicCatalog(String catalogName, String kuduMasters) {
- super(catalogName, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE);
+ super(catalogName, "default_database");
this.kuduMasters = kuduMasters;
this.kuduClient = createClient();
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
index 1d5be62..f5d7ca7 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
@@ -42,18 +42,10 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import java.util.stream.Collectors;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_COLS;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_PARTITION_NUMS;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_PRIMARY_KEY_COLS;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_REPLICAS;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.*;
public class KuduTableUtils {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
index c445465..15f7be7 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
@@ -20,20 +20,8 @@ package org.apache.flink.connectors.kudu.table.utils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.BooleanType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.DoubleType;
-import org.apache.flink.table.types.logical.FloatType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.SmallIntType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.TinyIntType;
-import org.apache.flink.table.types.logical.VarBinaryType;
-import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
-
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Type;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
index bcc9b2d..ac4db4c 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
@@ -36,11 +36,7 @@ import org.apache.flink.types.Row;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.*;
import org.apache.kudu.shaded.com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -76,7 +72,7 @@ public class KuduTestBase {
public static String[] columns = new String[]{"id", "title", "author", "price", "quantity"};
private static GenericContainer<?> master;
private static List<GenericContainer<?>> tServers;
- private static String masterAddress;
+ private static HostAndPort masterAddress;
private static KuduClient kuduClient;
@BeforeAll
@@ -90,7 +86,7 @@ public class KuduTestBase {
.withNetwork(network)
.withNetworkAliases("kudu-master");
master.start();
- masterAddress = HostAndPort.fromParts(master.getHost(), master.getMappedPort(KUDU_MASTER_PORT)).toString();
+ masterAddress = HostAndPort.fromParts(master.getHost(), master.getMappedPort(KUDU_MASTER_PORT));
for (int instance = 1; instance <= NUMBER_OF_REPLICA; instance++) {
String instanceName = "kudu-tserver-" + instance;
@@ -98,8 +94,8 @@ public class KuduTestBase {
.withExposedPorts(KUDU_TSERVER_PORT)
.withCommand("tserver")
.withEnv("KUDU_MASTERS", "kudu-master:" + KUDU_MASTER_PORT)
- .withEnv("TSERVER_ARGS", "--fs_wal_dir=/var/lib/kudu/tserver --use_hybrid_clock=false " +
- "--rpc_advertised_addresses=" + instanceName)
+ .withEnv("TSERVER_ARGS", "--fs_wal_dir=/var/lib/kudu/tserver --logtostderr "
+ +" --use_hybrid_clock=false --rpc_advertised_addresses=" + instanceName)
.withNetwork(network)
.withNetworkAliases(instanceName)
.dependsOn(master);
@@ -108,8 +104,7 @@ public class KuduTestBase {
}
tServers = tServersBuilder.build();
- System.out.println(HostAndPort.fromParts(master.getHost(), master.getMappedPort(8051)).toString());
- kuduClient = new KuduClient.KuduClientBuilder(masterAddress).build();
+ kuduClient = new KuduClient.KuduClientBuilder(masterAddress.toString()).build();
}
@AfterAll
@@ -239,7 +234,7 @@ public class KuduTestBase {
}
public String getMasterAddress() {
- return masterAddress;
+ return masterAddress.toString();
}
public KuduClient getClient() {
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
index dc8f777..f53b7c5 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
@@ -16,13 +16,12 @@
*/
package org.apache.flink.connectors.kudu.format;
-import org.apache.flink.connectors.kudu.connector.KuduTestBase;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
import org.apache.flink.types.Row;
-
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
index 6791765..1764608 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.types.Row;
-
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
index 1927631..d403d9c 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
-
import org.apache.flink.types.Row;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
index f6482da..0375c68 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -40,18 +40,11 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.*;
public class KuduTableFactoryTest extends KuduTestBase {
private StreamTableEnvironment tableEnv;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
index d3d4a63..4a53198 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
@@ -53,7 +53,7 @@ public class KuduTableSourceITCase extends KuduTestBase {
it.forEachRemaining(results::add);
assertEquals(5, results.size());
assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11", results.get(0).toString());
- tableEnv.sqlUpdate("DROP TABLE books");
+ tableEnv.executeSql("DROP TABLE books");
}
@@ -66,6 +66,6 @@ public class KuduTableSourceITCase extends KuduTestBase {
it.forEachRemaining(results::add);
assertEquals(1, results.size());
assertEquals("More Java for more dummies", results.get(0).toString());
- tableEnv.sqlUpdate("DROP TABLE books");
+ tableEnv.executeSql("DROP TABLE books");
}
}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
index 43734e4..d4101aa 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
@@ -19,15 +19,10 @@ package org.apache.flink.connectors.kudu.table;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.KuduTestBase;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.expressions.CallExpression;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.expressions.*;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.ScalarFunctionDefinition;
import org.apache.flink.table.types.DataType;
@@ -43,14 +38,7 @@ import java.util.List;
import static java.util.Collections.singletonList;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.EQUALS;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
/**
* Unit Tests for {@link KuduTableSource}.
@@ -68,7 +56,7 @@ public class KuduTableSourceTest extends KuduTestBase {
KuduTableInfo tableInfo = booksTableInfo("books", true);
setUpDatabase(tableInfo);
catalog = new KuduCatalog(getMasterAddress());
- ObjectPath op = new ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "books");
+ ObjectPath op = new ObjectPath("default_database", "books");
try {
kuduTableSource = catalog.getKuduTableFactory().createTableSource(op, catalog.getTable(op));
} catch (TableNotExistException e) {
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
index 4eae7bf..affdd04 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
@@ -26,14 +26,14 @@ import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
public class KuduTableTestUtils {
public static StreamTableEnvironment createTableEnvWithBlinkPlannerStreamingMode(StreamExecutionEnvironment env) {
- EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
return tableEnv;
}
public static TableEnvironment createTableEnvWithBlinkPlannerBatchMode() {
- EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
return tableEnv;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
index f37b40d..12d82a9 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
@@ -17,14 +17,8 @@
package org.apache.flink.connectors.kudu.writer;
import org.apache.flink.connectors.kudu.connector.KuduTestBase;
-
import org.apache.kudu.Schema;
-import org.apache.kudu.client.Delete;
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.PartialRow;
-import org.apache.kudu.client.Update;
-import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.*;
import org.junit.jupiter.api.BeforeEach;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
index 45e0b1b..f03c469 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
@@ -17,11 +17,10 @@
package org.apache.flink.connectors.kudu.writer;
-import org.apache.flink.connectors.kudu.connector.KuduTestBase.BookInfo;
import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase.BookInfo;
import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
import org.apache.flink.connectors.kudu.connector.writer.PojoOperationMapper;
-
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.PartialRow;
import org.junit.jupiter.api.Test;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
index e737063..3aeb673 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
@@ -20,7 +20,6 @@ import org.apache.flink.connectors.kudu.connector.KuduTestBase;
import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
import org.apache.flink.types.Row;
-
import org.apache.kudu.client.Operation;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
index 308a011..a52b7c1 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
@@ -20,7 +20,6 @@ import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.connectors.kudu.connector.KuduTestBase;
import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
import org.apache.flink.connectors.kudu.connector.writer.TupleOperationMapper;
-
import org.apache.kudu.client.Operation;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
diff --git a/flink-connector-netty/pom.xml b/flink-connector-netty/pom.xml
index 6f76037..ad450e3 100644
--- a/flink-connector-netty/pom.xml
+++ b/flink-connector-netty/pom.xml
@@ -82,7 +82,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <artifactId>flink-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
index 5bca265..4403507 100644
--- a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
+++ b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
@@ -17,10 +17,10 @@
package org.apache.flink.streaming.connectors.netty.example
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
-import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
/**
* Simple example for demonstrating the use of SQL on a Stream Table.
@@ -40,7 +40,7 @@ object StreamSqlExample {
val param = ParameterTool.fromArgs(args)
// set up execution environment
- val envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
+ val envSettings = EnvironmentSettings.newInstance().inStreamingMode().build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env, envSettings)
@@ -62,8 +62,7 @@ object StreamSqlExample {
// union the two tables
val result = tEnv.sqlQuery("SELECT * FROM OrderA WHERE amount > 2")
-
- result.toAppendStream[Order].print()
+ tEnv.toDataStream(result).print()
env.execute()
}
diff --git a/flink-connector-pinot/pom.xml b/flink-connector-pinot/pom.xml
index 670504a..6b1aaab 100644
--- a/flink-connector-pinot/pom.xml
+++ b/flink-connector-pinot/pom.xml
@@ -102,7 +102,11 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
@@ -118,13 +122,19 @@ under the License.
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-java-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime-web</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-tools</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <artifactId>flink-test-utils</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java
deleted file mode 100644
index 7d2fe94..0000000
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.pinot;
-
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter;
-import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
-import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
-import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
-import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
-import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
-import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkCommittableSerializer;
-import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkGlobalCommittableSerializer;
-import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkWriterStateSerializer;
-import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter;
-import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriterState;
-import org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment;
-import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.core.segment.name.SegmentNameGenerator;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Apache Pinot sink that stores objects from upstream Flink tasks in a Apache Pinot table. The sink
- * can be operated in {@code RuntimeExecutionMode.STREAMING} or {@code RuntimeExecutionMode.BATCH}
- * mode. But ensure to enable checkpointing when using in streaming mode.
- *
- * <p>We advise you to use the provided {@link PinotSink.Builder} to build and configure the
- * PinotSink. All the communication with the Pinot cluster's table is managed via the Pinot
- * controller. Thus you need to provide its host and port as well as the target Pinot table.
- * The {@link TableConfig} and {@link Schema} is automatically retrieved via the Pinot controller API
- * and therefore does not need to be provided.
- *
- * <p>Whenever an element is received by the sink it gets stored in a {@link PinotWriterSegment}. A
- * {@link PinotWriterSegment} represents exactly one segment that will be pushed to the Pinot
- * cluster later on. Its size is determined by the customizable {@code maxRowsPerSegment} parameter.
- * Please note that the maximum segment size that can be handled by this sink is limited by the
- * lower bound of memory available at each subTask.
- * Each subTask holds a list of {@link PinotWriterSegment}s of which at most one is active. An
- * active {@link PinotWriterSegment} is capable of accepting at least one more element. If a
- * {@link PinotWriterSegment} switches from active to inactive it flushes its
- * {@code maxRowsPerSegment} elements to disk. The data file is stored in the local filesystem's
- * temporary directory and contains serialized elements. We use the {@link JsonSerializer} to
- * serialize elements to JSON.
- *
- * <p>On checkpointing all not in-progress {@link PinotWriterSegment}s are transformed into
- * committables. As the data files need to be shared across nodes, the sink requires access to a
- * shared filesystem. We use the {@link FileSystemAdapter} for that purpose.
- * A {@link FileSystemAdapter} is capable of copying a file from the local to the shared filesystem
- * and vice-versa. A {@link PinotSinkCommittable} contains a reference to a data file on the shared
- * filesystem as well as the minimum and maximum timestamp contained in the data file. A timestamp -
- * usually the event time - is extracted from each received element via {@link EventTimeExtractor}.
- * The timestamps are later on required to follow the guideline for naming Pinot segments.
- * An eventually existent in-progress {@link PinotWriterSegment}'s state is saved in the snapshot
- * taken when checkpointing. This ensures that the at-most-once delivery guarantee can be fulfilled
- * when recovering from failures.
- *
- * <p>We use the {@link PinotSinkGlobalCommitter} to collect all created
- * {@link PinotSinkCommittable}s, create segments from the referenced data files and finally push them
- * to the Pinot table. Therefore, the minimum and maximum timestamp of all
- * {@link PinotSinkCommittable} is determined. The segment names are then generated using the
- * {@link PinotSinkSegmentNameGenerator} which gets the minimum and maximum timestamp as input.
- * The segment generation starts with downloading the referenced data file from the shared file system
- * using the provided {@link FileSystemAdapter}. Once this is was completed, we use Pinot's
- * {@link SegmentIndexCreationDriver} to generate the final segment. Each segment is thereby stored
- * in a temporary directory on the local filesystem. Next, the segment is uploaded to the Pinot
- * controller using Pinot's {@link UploadSegmentCommand}.
- *
- * <p>To ensure that possible failures are handled accordingly each segment name is checked for
- * existence within the Pinot cluster before uploading a segment. In case a segment name already
- * exists, i.e. if the last commit failed partially with some segments already been uploaded, the
- * existing segment is deleted first. When the elements since the last checkpoint are replayed the
- * minimum and maximum timestamp of all received elements will be the same. Thus the same set of
- * segment names is generated and we can delete previous segments by checking for segment name
- * presence. Note: The {@link PinotSinkSegmentNameGenerator} must be deterministic. We also provide
- * a {@link SimpleSegmentNameGenerator} which is a simple but for most users suitable segment name
- * generator.
- *
- * <p>Please note that we use the {@link GlobalCommitter} to ensure consistent segment naming. This
- * comes with performance limitations as a {@link GlobalCommitter} always runs at a parallelism of 1
- * which results in a clear bottleneck at the {@link PinotSinkGlobalCommitter} that does all the
- * computational intensive work (i.e. generating and uploading segments). In order to overcome this
- * issue we introduce a custom multithreading approach within the {@link PinotSinkGlobalCommitter}
- * to parallelize the segment creation and upload process.
- *
- * @param <IN> Type of incoming elements
- */
-public class PinotSink<IN> implements Sink<IN, PinotSinkCommittable, PinotSinkWriterState, PinotSinkGlobalCommittable> {
-
- private final String pinotControllerHost;
- private final String pinotControllerPort;
- private final String tableName;
- private final int maxRowsPerSegment;
- private final String tempDirPrefix;
- private final JsonSerializer<IN> jsonSerializer;
- private final SegmentNameGenerator segmentNameGenerator;
- private final FileSystemAdapter fsAdapter;
- private final EventTimeExtractor<IN> eventTimeExtractor;
- private final int numCommitThreads;
-
- /**
- * @param pinotControllerHost Host of the Pinot controller
- * @param pinotControllerPort Port of the Pinot controller
- * @param tableName Target table's name
- * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
- * @param tempDirPrefix Prefix for temp directories used
- * @param jsonSerializer Serializer used to convert elements to JSON
- * @param eventTimeExtractor Defines the way event times are extracted from received objects
- * @param segmentNameGenerator Pinot segment name generator
- * @param fsAdapter Filesystem adapter used to save files for sharing files across nodes
- * @param numCommitThreads Number of threads used in the {@link PinotSinkGlobalCommitter} for committing segments
- */
- private PinotSink(String pinotControllerHost, String pinotControllerPort, String tableName,
- int maxRowsPerSegment, String tempDirPrefix, JsonSerializer<IN> jsonSerializer,
- EventTimeExtractor<IN> eventTimeExtractor,
- SegmentNameGenerator segmentNameGenerator, FileSystemAdapter fsAdapter,
- int numCommitThreads) {
- this.pinotControllerHost = checkNotNull(pinotControllerHost);
- this.pinotControllerPort = checkNotNull(pinotControllerPort);
- this.tableName = checkNotNull(tableName);
-
- checkArgument(maxRowsPerSegment > 0);
- this.maxRowsPerSegment = maxRowsPerSegment;
- this.tempDirPrefix = checkNotNull(tempDirPrefix);
- this.jsonSerializer = checkNotNull(jsonSerializer);
- this.eventTimeExtractor = checkNotNull(eventTimeExtractor);
- this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
- this.fsAdapter = checkNotNull(fsAdapter);
- checkArgument(numCommitThreads > 0);
- this.numCommitThreads = numCommitThreads;
- }
-
- /**
- * Creates a Pinot sink writer.
- *
- * @param context InitContext
- * @param states State extracted from snapshot. This list must not have a size larger than 1
- */
- @Override
- public PinotSinkWriter<IN> createWriter(InitContext context, List<PinotSinkWriterState> states) {
- PinotSinkWriter<IN> writer = new PinotSinkWriter<>(
- context.getSubtaskId(), maxRowsPerSegment, eventTimeExtractor,
- jsonSerializer, fsAdapter
- );
-
- if (states.size() == 1) {
- writer.initializeState(states.get(0));
- } else if (states.size() > 1) {
- throw new IllegalStateException("Did not expected more than one element in states.");
- }
- return writer;
- }
-
- /**
- * The PinotSink does not use a committer. Instead a global committer is used
- *
- * @return Empty Optional
- */
- @Override
- public Optional<Committer<PinotSinkCommittable>> createCommitter() {
- return Optional.empty();
- }
-
- /**
- * Creates the global committer.
- */
- @Override
- public Optional<GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable>> createGlobalCommitter() throws IOException {
- String timeColumnName = eventTimeExtractor.getTimeColumn();
- TimeUnit segmentTimeUnit = eventTimeExtractor.getSegmentTimeUnit();
- PinotSinkGlobalCommitter committer = new PinotSinkGlobalCommitter(
- pinotControllerHost, pinotControllerPort, tableName, segmentNameGenerator,
- tempDirPrefix, fsAdapter, timeColumnName, segmentTimeUnit, numCommitThreads
- );
- return Optional.of(committer);
- }
-
- /**
- * Creates the committables' serializer.
- */
- @Override
- public Optional<SimpleVersionedSerializer<PinotSinkCommittable>> getCommittableSerializer() {
- return Optional.of(new PinotSinkCommittableSerializer());
- }
-
- /**
- * Creates the global committables' serializer.
- */
- @Override
- public Optional<SimpleVersionedSerializer<PinotSinkGlobalCommittable>> getGlobalCommittableSerializer() {
- return Optional.of(new PinotSinkGlobalCommittableSerializer());
- }
-
- /**
- * The PinotSink does not use writer states.
- *
- * @return Empty Optional
- */
- @Override
- public Optional<SimpleVersionedSerializer<PinotSinkWriterState>> getWriterStateSerializer() {
- return Optional.of(new PinotSinkWriterStateSerializer());
- }
-
- /**
- * Builder for configuring a {@link PinotSink}. This is the recommended public API.
- *
- * @param <IN> Type of incoming elements
- */
- public static class Builder<IN> {
-
- static final int DEFAULT_COMMIT_THREADS = 4;
-
- String pinotControllerHost;
- String pinotControllerPort;
- String tableName;
- int maxRowsPerSegment;
- String tempDirPrefix = "flink-connector-pinot";
- JsonSerializer<IN> jsonSerializer;
- EventTimeExtractor<IN> eventTimeExtractor;
- SegmentNameGenerator segmentNameGenerator;
- FileSystemAdapter fsAdapter;
- int numCommitThreads = DEFAULT_COMMIT_THREADS;
-
- /**
- * Defines the basic connection parameters.
- *
- * @param pinotControllerHost Host of the Pinot controller
- * @param pinotControllerPort Port of the Pinot controller
- * @param tableName Target table's name
- */
- public Builder(String pinotControllerHost, String pinotControllerPort, String tableName) {
- this.pinotControllerHost = pinotControllerHost;
- this.pinotControllerPort = pinotControllerPort;
- this.tableName = tableName;
- }
-
- /**
- * Defines the serializer used to serialize elements to JSON format.
- *
- * @param jsonSerializer JsonSerializer
- * @return Builder
- */
- public Builder<IN> withJsonSerializer(JsonSerializer<IN> jsonSerializer) {
- this.jsonSerializer = jsonSerializer;
- return this;
- }
-
- /**
- * Defines the EventTimeExtractor<IN> used to extract event times from received objects.
- *
- * @param eventTimeExtractor EventTimeExtractor
- * @return Builder
- */
- public Builder<IN> withEventTimeExtractor(EventTimeExtractor<IN> eventTimeExtractor) {
- this.eventTimeExtractor = eventTimeExtractor;
- return this;
- }
-
- /**
- * Defines the SegmentNameGenerator used to generate names for the segments pushed to Pinot.
- *
- * @param segmentNameGenerator SegmentNameGenerator
- * @return Builder
- */
- public Builder<IN> withSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
- this.segmentNameGenerator = segmentNameGenerator;
- return this;
- }
-
- /**
- * Defines a basic segment name generator which will be used to generate names for the
- * segments pushed to Pinot.
- *
- * @param segmentNamePostfix Postfix which will be appended to the segment name to identify
- * segments coming from this Flink sink
- * @return Builder
- */
- public Builder<IN> withSimpleSegmentNameGenerator(String segmentNamePostfix) {
- return withSegmentNameGenerator(new SimpleSegmentNameGenerator(tableName, segmentNamePostfix));
- }
-
- /**
- * Defines the FileSystemAdapter used share data files between the {@link PinotSinkWriter} and
- * the {@link PinotSinkGlobalCommitter}.
- *
- * @param fsAdapter Adapter for interacting with the shared file system
- * @return Builder
- */
- public Builder<IN> withFileSystemAdapter(FileSystemAdapter fsAdapter) {
- this.fsAdapter = fsAdapter;
- return this;
- }
-
- /**
- * Defines the segment size via the maximum number of elements per segment.
- *
- * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
- * @return Builder
- */
- public Builder<IN> withMaxRowsPerSegment(int maxRowsPerSegment) {
- this.maxRowsPerSegment = maxRowsPerSegment;
- return this;
- }
-
- /**
- * Defines the path prefix for the files created in a node's local filesystem.
- *
- * @param tempDirPrefix Prefix for temp directories used
- * @return Builder
- */
- public Builder<IN> withTempDirectoryPrefix(String tempDirPrefix) {
- this.tempDirPrefix = tempDirPrefix;
- return this;
- }
-
- /**
- * Defines the number of threads that shall be used to commit segments in the {@link PinotSinkGlobalCommitter}.
- *
- * @param numCommitThreads Number of threads
- * @return Builder
- */
- public Builder<IN> withNumCommitThreads(int numCommitThreads) {
- this.numCommitThreads = numCommitThreads;
- return this;
- }
-
- /**
- * Finally builds the {@link PinotSink} according to the configuration.
- *
- * @return PinotSink
- */
- public PinotSink<IN> build() {
- return new PinotSink<>(
- pinotControllerHost,
- pinotControllerPort,
- tableName,
- maxRowsPerSegment,
- tempDirPrefix,
- jsonSerializer,
- eventTimeExtractor,
- segmentNameGenerator,
- fsAdapter,
- numCommitThreads
- );
- }
- }
-}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java
deleted file mode 100644
index 8e45620..0000000
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.pinot.serializer;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Serializer for {@link PinotSinkGlobalCommittable}
- */
-@Internal
-public class PinotSinkGlobalCommittableSerializer implements SimpleVersionedSerializer<PinotSinkGlobalCommittable> {
-
- private static final int CURRENT_VERSION = 1;
-
- @Override
- public int getVersion() {
- return CURRENT_VERSION;
- }
-
- @Override
- public byte[] serialize(PinotSinkGlobalCommittable pinotSinkGlobalCommittable) throws IOException {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(baos)) {
- out.writeLong(pinotSinkGlobalCommittable.getMinTimestamp());
- out.writeLong(pinotSinkGlobalCommittable.getMaxTimestamp());
-
- int size = pinotSinkGlobalCommittable.getDataFilePaths().size();
- out.writeInt(size);
- for (String dataFilePath : pinotSinkGlobalCommittable.getDataFilePaths()) {
- out.writeUTF(dataFilePath);
- }
- out.flush();
- return baos.toByteArray();
- }
- }
-
- @Override
- public PinotSinkGlobalCommittable deserialize(int version, byte[] serialized) throws IllegalStateException, IOException {
- switch (version) {
- case 1:
- return deserializeV1(serialized);
- default:
- throw new IllegalStateException("Unrecognized version or corrupt state: " + version);
- }
- }
-
- private PinotSinkGlobalCommittable deserializeV1(byte[] serialized) throws IOException {
- try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
- DataInputStream in = new DataInputStream(bais)) {
- long minTimestamp = in.readLong();
- long maxTimestamp = in.readLong();
-
- long size = in.readInt();
- List<String> dataFilePaths = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- dataFilePaths.add(in.readUTF());
- }
- return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, maxTimestamp);
- }
- }
-}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSink.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSink.java
new file mode 100644
index 0000000..b300b89
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSink.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.v2;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommittableSerializer;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommitter;
+import org.apache.flink.streaming.connectors.pinot.v2.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.v2.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriter;
+import org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriterState;
+import org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriterStateSerializer;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class PinotSink<IN> implements
+ Sink<IN>,
+ StatefulSink<IN, PinotWriterState>,
+ TwoPhaseCommittingSink<IN, PinotSinkCommittable> {
+
+ private final String pinotControllerHost;
+ private final String pinotControllerPort;
+ private final String tableName;
+ private final int maxRowsPerSegment;
+ private final String tempDirPrefix;
+ private final JsonSerializer<IN> jsonSerializer;
+ private final SegmentNameGenerator segmentNameGenerator;
+ private final FileSystemAdapter fsAdapter;
+ private final EventTimeExtractor<IN> eventTimeExtractor;
+ private final int numCommitThreads;
+
+ /**
+ * @param pinotControllerHost Host of the Pinot controller
+ * @param pinotControllerPort Port of the Pinot controller
+ * @param tableName Target table's name
+ * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
+ * @param tempDirPrefix Prefix for temp directories used
+ * @param jsonSerializer Serializer used to convert elements to JSON
+ * @param eventTimeExtractor Defines the way event times are extracted from received objects
+ * @param segmentNameGenerator Pinot segment name generator
+ * @param fsAdapter Filesystem adapter used to save files for sharing files across nodes
+ * @param numCommitThreads Number of threads used in the {@link PinotSinkCommitter} for committing segments
+ */
+ protected PinotSink(String pinotControllerHost, String pinotControllerPort, String tableName,
+ int maxRowsPerSegment, String tempDirPrefix, JsonSerializer<IN> jsonSerializer,
+ EventTimeExtractor<IN> eventTimeExtractor,
+ SegmentNameGenerator segmentNameGenerator, FileSystemAdapter fsAdapter,
+ int numCommitThreads) {
+ this.pinotControllerHost = checkNotNull(pinotControllerHost);
+ this.pinotControllerPort = checkNotNull(pinotControllerPort);
+ this.tableName = checkNotNull(tableName);
+
+ checkArgument(maxRowsPerSegment > 0);
+ this.maxRowsPerSegment = maxRowsPerSegment;
+ this.tempDirPrefix = checkNotNull(tempDirPrefix);
+ this.jsonSerializer = checkNotNull(jsonSerializer);
+ this.eventTimeExtractor = checkNotNull(eventTimeExtractor);
+ this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+ this.fsAdapter = checkNotNull(fsAdapter);
+ checkArgument(numCommitThreads > 0);
+ this.numCommitThreads = numCommitThreads;
+ }
+
+
+ @Override
+ public PinotWriter<IN> createWriter(InitContext context) throws IOException {
+ return this.restoreWriter(context, Collections.emptyList());
+ }
+
+ @Override
+ public PinotWriter<IN> restoreWriter(InitContext context, Collection<PinotWriterState> states) throws IOException {
+ PinotWriter<IN> writer = new PinotWriter<>(
+ context.getSubtaskId(), maxRowsPerSegment, eventTimeExtractor,
+ jsonSerializer, fsAdapter, states, this.createCommitter()
+ );
+ return writer;
+ }
+
+ @Override
+ public PinotSinkCommitter createCommitter() throws IOException {
+ String timeColumnName = eventTimeExtractor.getTimeColumn();
+ TimeUnit segmentTimeUnit = eventTimeExtractor.getSegmentTimeUnit();
+ PinotSinkCommitter committer = new PinotSinkCommitter(
+ pinotControllerHost, pinotControllerPort, tableName, segmentNameGenerator,
+ tempDirPrefix, fsAdapter, timeColumnName, segmentTimeUnit, numCommitThreads
+ );
+ return committer;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<PinotSinkCommittable> getCommittableSerializer() {
+ return new PinotSinkCommittableSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<PinotWriterState> getWriterStateSerializer() {
+ return new PinotWriterStateSerializer();
+ }
+
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSinkBuilder.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSinkBuilder.java
new file mode 100644
index 0000000..5af5c1d
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSinkBuilder.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.v2;
+
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommitter;
+import org.apache.flink.streaming.connectors.pinot.v2.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.v2.external.JsonSerializer;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+
+public class PinotSinkBuilder<IN> {
+
+ static final int DEFAULT_COMMIT_THREADS = 4;
+
+ String pinotControllerHost;
+ String pinotControllerPort;
+ String tableName;
+ int maxRowsPerSegment;
+ String tempDirPrefix = "flink-connector-pinot";
+ JsonSerializer<IN> jsonSerializer;
+ EventTimeExtractor<IN> eventTimeExtractor;
+ SegmentNameGenerator segmentNameGenerator;
+ FileSystemAdapter fsAdapter;
+ int numCommitThreads = DEFAULT_COMMIT_THREADS;
+
+ /**
+ * Defines the basic connection parameters.
+ *
+ * @param pinotControllerHost Host of the Pinot controller
+ * @param pinotControllerPort Port of the Pinot controller
+ * @param tableName Target table's name
+ */
+ public PinotSinkBuilder(String pinotControllerHost, String pinotControllerPort, String tableName) {
+ this.pinotControllerHost = pinotControllerHost;
+ this.pinotControllerPort = pinotControllerPort;
+ this.tableName = tableName;
+ }
+
+ /**
+ * Defines the serializer used to serialize elements to JSON format.
+ *
+ * @param jsonSerializer JsonSerializer
+ * @return Builder
+ */
+ public PinotSinkBuilder<IN> withJsonSerializer(JsonSerializer<IN> jsonSerializer) {
+ this.jsonSerializer = jsonSerializer;
+ return this;
+ }
+
+ /**
+ * Defines the EventTimeExtractor<IN> used to extract event times from received objects.
+ *
+ * @param eventTimeExtractor EventTimeExtractor
+ * @return Builder
+ */
+ public PinotSinkBuilder<IN> withEventTimeExtractor(EventTimeExtractor<IN> eventTimeExtractor) {
+ this.eventTimeExtractor = eventTimeExtractor;
+ return this;
+ }
+
+ /**
+ * Defines the SegmentNameGenerator used to generate names for the segments pushed to Pinot.
+ *
+ * @param segmentNameGenerator SegmentNameGenerator
+ * @return Builder
+ */
+ public PinotSinkBuilder<IN> withSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
+ this.segmentNameGenerator = segmentNameGenerator;
+ return this;
+ }
+
+ /**
+ * Defines a basic segment name generator which will be used to generate names for the
+ * segments pushed to Pinot.
+ *
+ * @param segmentNamePostfix Postfix which will be appended to the segment name to identify
+ * segments coming from this Flink sink
+ * @return Builder
+ */
+ public PinotSinkBuilder<IN> withSimpleSegmentNameGenerator(String segmentNamePostfix) {
+ return withSegmentNameGenerator(new SimpleSegmentNameGenerator(tableName, segmentNamePostfix));
+ }
+
+ /**
+ * Defines the FileSystemAdapter used share data files between the {@link org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriter} and
+ * the {@link PinotSinkCommitter}.
+ *
+ * @param fsAdapter Adapter for interacting with the shared file system
+ * @return Builder
+ */
+ public PinotSinkBuilder<IN> withFileSystemAdapter(FileSystemAdapter fsAdapter) {
+ this.fsAdapter = fsAdapter;
+ return this;
+ }
+
+ /**
+ * Defines the segment size via the maximum number of elements per segment.
+ *
+ * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
+ * @return Builder
+ */
+ public PinotSinkBuilder<IN> withMaxRowsPerSegment(int maxRowsPerSegment) {
+ this.maxRowsPerSegment = maxRowsPerSegment;
+ return this;
+ }
+
+ /**
+ * Defines the path prefix for the files created in a node's local filesystem.
+ *
+ * @param tempDirPrefix Prefix for temp directories used
+ * @return Builder
+ */
+ public PinotSinkBuilder<IN> withTempDirectoryPrefix(String tempDirPrefix) {
+ this.tempDirPrefix = tempDirPrefix;
+ return this;
+ }
+
+ /**
+ * Defines the number of threads that shall be used to commit segments in the {@link PinotSinkCommitter}.
+ *
+ * @param numCommitThreads Number of threads
+ * @return Builder
+ */
+ public PinotSinkBuilder<IN> withNumCommitThreads(int numCommitThreads) {
+ this.numCommitThreads = numCommitThreads;
+ return this;
+ }
+
+ /**
+ * Finally builds the {@link PinotSink} according to the configuration.
+ *
+ * @return PinotSink
+ */
+ public PinotSink<IN> build() {
+ return new PinotSink<>(
+ pinotControllerHost,
+ pinotControllerPort,
+ tableName,
+ maxRowsPerSegment,
+ tempDirPrefix,
+ jsonSerializer,
+ eventTimeExtractor,
+ segmentNameGenerator,
+ fsAdapter,
+ numCommitThreads
+ );
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkCommittable.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommittable.java
similarity index 95%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkCommittable.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommittable.java
index 5a8c655..611655c 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkCommittable.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommittable.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.pinot.committer;
+package org.apache.flink.streaming.connectors.pinot.v2.committer;
import org.apache.flink.annotation.Internal;
@@ -26,7 +26,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The PinotSinkCommittable is required for sharing committables with the
- * {@link PinotSinkGlobalCommitter} instance
+ * {@link PinotSinkCommittable} instance
*/
@Internal
public class PinotSinkCommittable implements Serializable {
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommittableSerializer.java
similarity index 94%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommittableSerializer.java
index ed61de2..cce37ae 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommittableSerializer.java
@@ -16,11 +16,10 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.pinot.serializer;
+package org.apache.flink.streaming.connectors.pinot.v2.committer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
import java.io.*;
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommitter.java
similarity index 74%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommitter.java
index 46e03e4..f6e0d51 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommitter.java
@@ -15,11 +15,13 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.pinot.committer;
+package org.apache.flink.streaming.connectors.pinot.v2.committer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.streaming.connectors.pinot.PinotControllerClient;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemUtils;
import org.apache.pinot.common.segment.ReadMode;
@@ -39,26 +41,24 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.*;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * Global committer takes committables from {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter},
+ * Global committer takes committables from {@link org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriter},
* generates segments and pushed them to the Pinot controller.
* Note: We use a custom multithreading approach to parallelize the segment creation and upload to
* overcome the performance limitations resulting from using a {@link GlobalCommitter} always
* running at a parallelism of 1.
*/
@Internal
-public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> {
+public class PinotSinkCommitter implements Committer<PinotSinkCommittable> {
- private static final Logger LOG = LoggerFactory.getLogger(PinotSinkGlobalCommitter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PinotSinkCommitter.class);
private final String pinotControllerHost;
private final String pinotControllerPort;
@@ -84,11 +84,11 @@ public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommit
* @param segmentTimeUnit Unit of the time column
* @param numCommitThreads Number of threads used to commit the committables
*/
- public PinotSinkGlobalCommitter(String pinotControllerHost, String pinotControllerPort,
- String tableName, SegmentNameGenerator segmentNameGenerator,
- String tempDirPrefix, FileSystemAdapter fsAdapter,
- String timeColumnName, TimeUnit segmentTimeUnit,
- int numCommitThreads) throws IOException {
+ public PinotSinkCommitter(String pinotControllerHost, String pinotControllerPort,
+ String tableName, SegmentNameGenerator segmentNameGenerator,
+ String tempDirPrefix, FileSystemAdapter fsAdapter,
+ String timeColumnName, TimeUnit segmentTimeUnit,
+ int numCommitThreads) throws IOException {
this.pinotControllerHost = checkNotNull(pinotControllerHost);
this.pinotControllerPort = checkNotNull(pinotControllerPort);
this.tableName = checkNotNull(tableName);
@@ -110,45 +110,98 @@ public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommit
this.pool = Executors.newFixedThreadPool(numCommitThreads);
}
- /**
- * Identifies global committables that need to be re-committed from a list of recovered committables.
- *
- * @param globalCommittables List of global committables that are checked for required re-commit
- * @return List of global committable that need to be re-committed
- * @throws IOException
- */
+
@Override
- public List<PinotSinkGlobalCommittable> filterRecoveredCommittables(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
- // Holds identified global committables whose commit needs to be retried
- List<PinotSinkGlobalCommittable> committablesToRetry = new ArrayList<>();
+ public void commit(Collection<CommitRequest<PinotSinkCommittable>> collection) throws IOException, InterruptedException {
+ Collection<PinotSinkCommittable> committables = collection.stream()
+ .map(CommitRequest::getCommittable)
+ .collect(Collectors.toList());
+
+ this.commitSink(committables);
+
+ }
- for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) {
- CommitStatus commitStatus = getCommitStatus(globalCommittable);
+ public void commitSink(Collection<PinotSinkCommittable> collection) throws IOException, InterruptedException {
+ if (collection.isEmpty()) return;
- if (commitStatus.getMissingSegmentNames().isEmpty()) {
- // All segments were already committed. Thus, we do not need to retry the commit.
- continue;
+ // List of failed global committables that can be retried later on
+ List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>();
+
+ PinotSinkGlobalCommittable globalCommittable = this.combine(new ArrayList<>(collection));
+
+ filterRecoveredCommittables(globalCommittable);
+
+ Set<Future<Boolean>> resultFutures = new HashSet<>();
+ // Commit all segments in globalCommittable
+ for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) {
+ String dataFilePath = globalCommittable.getDataFilePaths().get(sequenceId);
+ // Get segment names with increasing sequenceIds
+ String segmentName = getSegmentName(globalCommittable, sequenceId);
+ // Segment committer handling the whole commit process for a single segment
+ Callable<Boolean> segmentCommitter = new SegmentCommitter(
+ pinotControllerHost, pinotControllerPort, tempDirectory, fsAdapter,
+ dataFilePath, segmentName, tableSchema, tableConfig, timeColumnName,
+ segmentTimeUnit
+ );
+ // Submits the segment committer to the thread pool
+ resultFutures.add(pool.submit(segmentCommitter));
+ }
+
+ boolean commitSucceeded = true;
+ try {
+ for (Future<Boolean> wasSuccessful : resultFutures) {
+ // In case any of the segment commits wasn't successful we mark the whole
+ // globalCommittable as failed
+ if (!wasSuccessful.get()) {
+ commitSucceeded = false;
+ failedCommits.add(globalCommittable);
+ // Once any of the commits failed, we do not need to check the remaining
+ // ones, as we try to commit the globalCommittable next time
+ break;
+ }
}
+ } catch (InterruptedException | ExecutionException e) {
+ // In case of an exception thrown while accessing commit status, mark the whole
+ // globalCommittable as failed
+ failedCommits.add(globalCommittable);
+ LOG.error("Accessing a SegmentCommitter thread errored with {}", e.getMessage(), e);
+ }
- for (String existingSegment : commitStatus.getExistingSegmentNames()) {
- // Some but not all segments were already committed. As we cannot assure the data
- // files containing the same data as originally when recovering from failure,
- // we delete the already committed segments in order to recommit them later on.
- pinotControllerClient.deleteSegment(tableName, existingSegment);
+ if (commitSucceeded) {
+ // If commit succeeded, cleanup the data files stored on the shared file system. In
+ // case the commit of at least one of the segments failed, nothing will be cleaned
+ // up here to enable retrying failed commits (data files must therefore stay
+ // available on the shared filesystem).
+ for (String path : globalCommittable.getDataFilePaths()) {
+ fsAdapter.deleteFromSharedFileSystem(path);
}
- committablesToRetry.add(globalCommittable);
}
- return committablesToRetry;
+
+ if (failedCommits.size() > 0) {
+ LOG.error(failedCommits.toString());
+ }
+
+ }
+
+ /**
+ * Helper method for generating segment names using the segment name generator.
+ *
+ * @param globalCommittable Global committable the segment name shall be generated from
+ * @param sequenceId Incrementing counter
+ * @return generated segment name
+ */
+ private String getSegmentName(PinotSinkGlobalCommittable globalCommittable, int sequenceId) {
+ return segmentNameGenerator.generateSegmentName(sequenceId,
+ globalCommittable.getMinTimestamp(), globalCommittable.getMaxTimestamp());
}
/**
* Combines multiple {@link PinotSinkCommittable}s into one {@link PinotSinkGlobalCommittable}
* by finding the minimum and maximum timestamps from the provided {@link PinotSinkCommittable}s.
*
- * @param committables Committables created by {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}
+ * @param committables Committables created by {@link org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriter}
* @return Global committer committable
*/
- @Override
public PinotSinkGlobalCommittable combine(List<PinotSinkCommittable> committables) {
List<String> dataFilePaths = new ArrayList<>();
long minTimestamp = Long.MAX_VALUE;
@@ -167,100 +220,26 @@ public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommit
}
/**
- * Copies data files from shared filesystem to the local filesystem, generates segments with names
- * according to the segment naming schema and finally pushes the segments to the Pinot cluster.
- * Before pushing a segment it is checked whether there already exists a segment with that name
- * in the Pinot cluster by calling the Pinot controller. In case there is one, it gets deleted.
+ * Identifies global committables that need to be re-committed from a list of recovered committables.
*
- * @param globalCommittables List of global committables
- * @return Global committables whose commit failed
+ * @param globalCommittable List of global committables that are checked for required re-commit
+ * @return List of global committable that need to be re-committed
* @throws IOException
*/
- @Override
- public List<PinotSinkGlobalCommittable> commit(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
- // List of failed global committables that can be retried later on
- List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>();
-
- for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) {
- Set<Future<Boolean>> resultFutures = new HashSet<>();
- // Commit all segments in globalCommittable
- for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) {
- String dataFilePath = globalCommittable.getDataFilePaths().get(sequenceId);
- // Get segment names with increasing sequenceIds
- String segmentName = getSegmentName(globalCommittable, sequenceId);
- // Segment committer handling the whole commit process for a single segment
- Callable<Boolean> segmentCommitter = new SegmentCommitter(
- pinotControllerHost, pinotControllerPort, tempDirectory, fsAdapter,
- dataFilePath, segmentName, tableSchema, tableConfig, timeColumnName,
- segmentTimeUnit
- );
- // Submits the segment committer to the thread pool
- resultFutures.add(pool.submit(segmentCommitter));
- }
+ private PinotSinkGlobalCommittable filterRecoveredCommittables(PinotSinkGlobalCommittable globalCommittable) throws IOException {
- boolean commitSucceeded = true;
- try {
- for (Future<Boolean> wasSuccessful : resultFutures) {
- // In case any of the segment commits wasn't successful we mark the whole
- // globalCommittable as failed
- if (!wasSuccessful.get()) {
- commitSucceeded = false;
- failedCommits.add(globalCommittable);
- // Once any of the commits failed, we do not need to check the remaining
- // ones, as we try to commit the globalCommittable next time
- break;
- }
- }
- } catch (InterruptedException | ExecutionException e) {
- // In case of an exception thrown while accessing commit status, mark the whole
- // globalCommittable as failed
- failedCommits.add(globalCommittable);
- LOG.error("Accessing a SegmentCommitter thread errored with {}", e.getMessage(), e);
- }
+ CommitStatus commitStatus = getCommitStatus(globalCommittable);
- if (commitSucceeded) {
- // If commit succeeded, cleanup the data files stored on the shared file system. In
- // case the commit of at least one of the segments failed, nothing will be cleaned
- // up here to enable retrying failed commits (data files must therefore stay
- // available on the shared filesystem).
- for (String path : globalCommittable.getDataFilePaths()) {
- fsAdapter.deleteFromSharedFileSystem(path);
- }
+ if (!commitStatus.getMissingSegmentNames().isEmpty()) {
+ for (String existingSegment : commitStatus.getExistingSegmentNames()) {
+ // Some but not all segments were already committed. As we cannot assure the data
+ // files containing the same data as originally when recovering from failure,
+ // we delete the already committed segments in order to recommit them later on.
+ pinotControllerClient.deleteSegment(tableName, existingSegment);
}
}
- // Return failed commits so that they can be retried later on
- return failedCommits;
- }
-
- /**
- * Empty method.
- */
- @Override
- public void endOfInput() {
- }
-
- /**
- * Closes the Pinot controller http client, clears the created temporary directory and
- * shuts the thread pool down.
- */
- @Override
- public void close() throws IOException {
- pinotControllerClient.close();
- tempDirectory.delete();
- pool.shutdown();
- }
-
- /**
- * Helper method for generating segment names using the segment name generator.
- *
- * @param globalCommittable Global committable the segment name shall be generated from
- * @param sequenceId Incrementing counter
- * @return generated segment name
- */
- private String getSegmentName(PinotSinkGlobalCommittable globalCommittable, int sequenceId) {
- return segmentNameGenerator.generateSegmentName(sequenceId,
- globalCommittable.getMinTimestamp(), globalCommittable.getMaxTimestamp());
+ return globalCommittable;
}
/**
@@ -290,6 +269,13 @@ public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommit
return new CommitStatus(existingSegmentNames, missingSegmentNames);
}
+ @Override
+ public void close() throws Exception {
+ pinotControllerClient.close();
+ tempDirectory.delete();
+ pool.shutdown();
+ }
+
/**
* Wrapper for existing and missing segments in the Pinot cluster.
*/
@@ -315,7 +301,7 @@ public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommit
* Helper class for committing a single segment. Downloads a data file from the shared filesystem,
* generates a segment from the data file and uploads segment to the Pinot controller.
*/
- private static class SegmentCommitter implements Callable<Boolean> {
+ public static class SegmentCommitter implements Callable<Boolean> {
private static final Logger LOG = LoggerFactory.getLogger(SegmentCommitter.class);
@@ -342,11 +328,11 @@ public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommit
* @param timeColumnName Name of the column containing the timestamp
* @param segmentTimeUnit Unit of the time column
*/
- SegmentCommitter(String pinotControllerHost, String pinotControllerPort,
- File tempDirectory, FileSystemAdapter fsAdapter,
- String dataFilePath, String segmentName, Schema tableSchema,
- TableConfig tableConfig, String timeColumnName,
- TimeUnit segmentTimeUnit) {
+ public SegmentCommitter(String pinotControllerHost, String pinotControllerPort,
+ File tempDirectory, FileSystemAdapter fsAdapter,
+ String dataFilePath, String segmentName, Schema tableSchema,
+ TableConfig tableConfig, String timeColumnName,
+ TimeUnit segmentTimeUnit) {
this.pinotControllerHost = pinotControllerHost;
this.pinotControllerPort = pinotControllerPort;
this.tempDirectory = tempDirectory;
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/external/EventTimeExtractor.java
similarity index 51%
copy from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
copy to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/external/EventTimeExtractor.java
index 0e23e2f..755e14f 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/external/EventTimeExtractor.java
@@ -16,32 +16,36 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.pinot.writer;
+package org.apache.flink.streaming.connectors.pinot.v2.external;
-import java.io.Serializable;
-import java.util.List;
-
-public class PinotSinkWriterState implements Serializable {
-
- private final List<String> serializedElements;
- private final long minTimestamp;
- private final long maxTimestamp;
-
- public PinotSinkWriterState(List<String> serializedElements, long minTimestamp, long maxTimestamp) {
- this.serializedElements = serializedElements;
- this.minTimestamp = minTimestamp;
- this.maxTimestamp = maxTimestamp;
- }
+import org.apache.flink.api.connector.sink2.SinkWriter;
- public List<String> getSerializedElements() {
- return serializedElements;
- }
-
- public long getMinTimestamp() {
- return minTimestamp;
- }
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
- public long getMaxTimestamp() {
- return maxTimestamp;
- }
+/**
+ * Defines the interface for event time extractors
+ *
+ * @param <IN> Type of incoming elements
+ */
+public interface EventTimeExtractor<IN> extends Serializable {
+
+ /**
+ * Extracts event time from incoming elements.
+ *
+ * @param element Incoming element
+ * @param context Context of SinkWriter
+ * @return timestamp
+ */
+ long getEventTime(IN element, SinkWriter.Context context);
+
+ /**
+ * @return Name of column in Pinot target table that contains the timestamp.
+ */
+ String getTimeColumn();
+
+ /**
+ * @return Unit of the time column in the Pinot target table.
+ */
+ TimeUnit getSegmentTimeUnit();
}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/external/JsonSerializer.java
similarity index 52%
copy from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
copy to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/external/JsonSerializer.java
index 0e23e2f..4f366f5 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/external/JsonSerializer.java
@@ -16,32 +16,17 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.pinot.writer;
+package org.apache.flink.streaming.connectors.pinot.v2.external;
import java.io.Serializable;
-import java.util.List;
-public class PinotSinkWriterState implements Serializable {
-
- private final List<String> serializedElements;
- private final long minTimestamp;
- private final long maxTimestamp;
-
- public PinotSinkWriterState(List<String> serializedElements, long minTimestamp, long maxTimestamp) {
- this.serializedElements = serializedElements;
- this.minTimestamp = minTimestamp;
- this.maxTimestamp = maxTimestamp;
- }
-
- public List<String> getSerializedElements() {
- return serializedElements;
- }
-
- public long getMinTimestamp() {
- return minTimestamp;
- }
+/**
+ * Defines the interface for serializing incoming elements to JSON format.
+ * The JSON format is expected during Pinot segment creation.
+ *
+ * @param <IN> Type of incoming elements
+ */
+public abstract class JsonSerializer<IN> implements Serializable {
- public long getMaxTimestamp() {
- return maxTimestamp;
- }
+ public abstract String toJson(IN element);
}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriter.java
similarity index 65%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriter.java
index 1a84e02..4e66c1e 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriter.java
@@ -16,35 +16,34 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.pinot.writer;
+package org.apache.flink.streaming.connectors.pinot.v2.writer;
import com.google.common.collect.Iterables;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink.SinkWriter;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
-import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
-import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommitter;
+import org.apache.flink.streaming.connectors.pinot.v2.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.v2.external.JsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
-/**
- * Accepts incoming elements and creates {@link PinotSinkCommittable}s out of them on request.
- *
- * @param <IN> Type of incoming elements
- */
-@Internal
-public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable, PinotSinkWriterState> {
+public class PinotWriter<IN> implements SinkWriter<IN>,
+ StatefulSink.StatefulSinkWriter<IN, PinotWriterState>,
+ TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, PinotSinkCommittable> {
- private static final Logger LOG = LoggerFactory.getLogger(PinotSinkWriter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PinotWriter.class);
private final int maxRowsPerSegment;
private final EventTimeExtractor<IN> eventTimeExtractor;
@@ -55,6 +54,9 @@ public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable,
private final int subtaskId;
+ private final PinotSinkCommitter committer;
+
+
/**
* @param subtaskId Subtask id provided by Flink
* @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
@@ -62,88 +64,48 @@ public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable,
* @param jsonSerializer Serializer used to convert elements to JSON
* @param fsAdapter Filesystem adapter used to save files for sharing files across nodes
*/
- public PinotSinkWriter(int subtaskId, int maxRowsPerSegment,
- EventTimeExtractor<IN> eventTimeExtractor,
- JsonSerializer<IN> jsonSerializer, FileSystemAdapter fsAdapter) {
+ public PinotWriter(int subtaskId, int maxRowsPerSegment,
+ EventTimeExtractor<IN> eventTimeExtractor,
+ JsonSerializer<IN> jsonSerializer,
+ FileSystemAdapter fsAdapter,
+ Collection<PinotWriterState> states,
+ PinotSinkCommitter committer) {
this.subtaskId = subtaskId;
this.maxRowsPerSegment = maxRowsPerSegment;
this.eventTimeExtractor = checkNotNull(eventTimeExtractor);
this.jsonSerializer = checkNotNull(jsonSerializer);
this.fsAdapter = checkNotNull(fsAdapter);
this.activeSegments = new ArrayList<>();
+ this.committer = committer;
+
+ if (states.size() == 1) {
+ initializeState(states.iterator().next());
+ } else if (states.size() > 1) {
+ throw new IllegalStateException("Did not expected more than one element in states.");
+ }
}
- /**
- * Takes elements from an upstream tasks and writes them into {@link PinotWriterSegment}
- *
- * @param element Object from upstream task
- * @param context SinkWriter context
- * @throws IOException
- */
@Override
- public void write(IN element, Context context) throws IOException {
+ public void write(IN element, Context context) throws IOException, InterruptedException {
final PinotWriterSegment<IN> inProgressSegment = getOrCreateInProgressSegment();
inProgressSegment.write(element, eventTimeExtractor.getEventTime(element, context));
}
- /**
- * Creates {@link PinotSinkCommittable}s from elements previously received via {@link #write}.
- * If flush is set, all {@link PinotWriterSegment}s are transformed into
- * {@link PinotSinkCommittable}s. If flush is not set, only currently non-active
- * {@link PinotSinkCommittable}s are transformed into {@link PinotSinkCommittable}s.
- * To convert a {@link PinotWriterSegment} into a {@link PinotSinkCommittable} the data gets
- * written to the shared filesystem. Moreover, minimum and maximum timestamps are identified.
- * Finally, all {@link PinotWriterSegment}s transformed into {@link PinotSinkCommittable}s are
- * removed from {@link #activeSegments}.
- *
- * @param flush Flush all currently known elements into the {@link PinotSinkCommittable}s
- * @return List of {@link PinotSinkCommittable} to process in {@link org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter}
- * @throws IOException
- */
@Override
- public List<PinotSinkCommittable> prepareCommit(boolean flush) throws IOException {
- // Identify segments to commit. If the flush argument is set all segments shall be committed.
- // Otherwise, take only those PinotWriterSegments that do not accept any more elements.
- List<PinotWriterSegment<IN>> segmentsToCommit = activeSegments.stream()
- .filter(s -> flush || !s.acceptsElements())
- .collect(Collectors.toList());
- LOG.debug("Identified {} segments to commit [subtaskId={}]", segmentsToCommit.size(), subtaskId);
-
- LOG.debug("Creating committables... [subtaskId={}]", subtaskId);
- List<PinotSinkCommittable> committables = new ArrayList<>();
- for (final PinotWriterSegment<IN> segment : segmentsToCommit) {
- committables.add(segment.prepareCommit());
+ public void flush(boolean flush) throws IOException, InterruptedException {
+ if (flush) {
+ Collection<PinotSinkCommittable> committables = prepareCommittables(flush);
+ committer.commitSink(committables);
}
- LOG.debug("Created {} committables [subtaskId={}]", committables.size(), subtaskId);
-
- // Remove all PinotWriterSegments that will be emitted within the committables.
- activeSegments.removeAll(segmentsToCommit);
- return committables;
}
- /**
- * Gets the {@link PinotWriterSegment} still accepting elements or creates a new one.
- *
- * @return {@link PinotWriterSegment} accepting at least one more element
- */
- private PinotWriterSegment<IN> getOrCreateInProgressSegment() {
- final PinotWriterSegment<IN> latestSegment = Iterables.getLast(activeSegments, null);
- if (latestSegment == null || !latestSegment.acceptsElements()) {
- final PinotWriterSegment<IN> inProgressSegment = new PinotWriterSegment<>(maxRowsPerSegment, jsonSerializer, fsAdapter);
- activeSegments.add(inProgressSegment);
- return inProgressSegment;
- }
- return latestSegment;
+ @Override
+ public Collection<PinotSinkCommittable> prepareCommit() throws IOException, InterruptedException {
+ return prepareCommittables(false);
}
- /**
- * Snapshots the latest PinotWriterSegment (if existent), so that the contained (and not yet
- * committed) elements can be recovered later on in case of a failure.
- *
- * @return A list containing at most one PinotSinkWriterState
- */
@Override
- public List<PinotSinkWriterState> snapshotState() {
+ public List<PinotWriterState> snapshotState(long l) throws IOException {
final PinotWriterSegment<IN> latestSegment = Iterables.getLast(activeSegments, null);
if (latestSegment == null || !latestSegment.acceptsElements()) {
return new ArrayList<>();
@@ -155,10 +117,10 @@ public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable,
/**
* Initializes the writer according to a previously taken snapshot.
*
- * @param state PinotSinkWriterState extracted from snapshot
+ * @param state PinotWriterState extracted from snapshot
*/
- public void initializeState(PinotSinkWriterState state) {
- if (activeSegments.size() != 0) {
+ private void initializeState(PinotWriterState state) {
+ if (!activeSegments.isEmpty()) {
throw new IllegalStateException("Please call the initialization before creating the first PinotWriterSegment.");
}
// Create a new PinotWriterSegment and recover its state from the given PinotSinkWriterState
@@ -167,10 +129,47 @@ public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable,
activeSegments.add(inProgressSegment);
}
+ @Override
+ public void close() throws Exception {
+
+ }
+
/**
- * Empty method, as we do not open any connections.
+ * Gets the {@link PinotWriterSegment} still accepting elements or creates a new one.
+ *
+ * @return {@link PinotWriterSegment} accepting at least one more element
*/
- @Override
- public void close() {
+ private PinotWriterSegment<IN> getOrCreateInProgressSegment() {
+ final PinotWriterSegment<IN> latestSegment = Iterables.getLast(activeSegments, null);
+ if (latestSegment == null || !latestSegment.acceptsElements()) {
+ final PinotWriterSegment<IN> inProgressSegment = new PinotWriterSegment<>(maxRowsPerSegment, jsonSerializer, fsAdapter);
+ activeSegments.add(inProgressSegment);
+ return inProgressSegment;
+ }
+ return latestSegment;
}
+
+ private List<PinotSinkCommittable> prepareCommittables(boolean flush) throws IOException {
+ if (activeSegments.isEmpty()) return Collections.emptyList();
+
+ // Identify segments to commit. If the flush argument is set all segments shall be committed.
+ // Otherwise, take only those PinotWriterSegments that do not accept any more elements.
+ List<PinotWriterSegment<IN>> segmentsToCommit = activeSegments.stream()
+ .filter(s -> flush || !s.acceptsElements())
+ .collect(Collectors.toList());
+ LOG.debug("Identified {} segments to commit [subtaskId={}]", segmentsToCommit.size(), subtaskId);
+
+ LOG.debug("Creating committables... [subtaskId={}]", subtaskId);
+ List<PinotSinkCommittable> committables = new ArrayList<>();
+ for (final PinotWriterSegment<IN> segment : segmentsToCommit) {
+ committables.add(segment.prepareCommit());
+ }
+ LOG.debug("Created {} committables [subtaskId={}]", committables.size(), subtaskId);
+
+ // Remove all PinotWriterSegments that will be emitted within the committables.
+ activeSegments.removeAll(segmentsToCommit);
+ return committables;
+ }
+
+
}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterSegment.java
similarity index 94%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterSegment.java
index 50be145..80e3157 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterSegment.java
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.pinot.writer;
+package org.apache.flink.streaming.connectors.pinot.v2.writer;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
-import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.v2.external.JsonSerializer;
import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
import java.io.IOException;
@@ -143,11 +143,11 @@ public class PinotWriterSegment<IN> implements Serializable {
*
* @return List of elements currently stored within the {@link PinotWriterSegment}
*/
- public PinotSinkWriterState snapshotState() {
+ public PinotWriterState snapshotState() {
if (!acceptsElements()) {
throw new IllegalStateException("Snapshots can only be created of in-progress segments.");
}
- return new PinotSinkWriterState(serializedElements, minTimestamp, maxTimestamp);
+ return new PinotWriterState(serializedElements, minTimestamp, maxTimestamp);
}
}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterState.java
similarity index 85%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterState.java
index 0e23e2f..88e5b54 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterState.java
@@ -16,18 +16,18 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.pinot.writer;
+package org.apache.flink.streaming.connectors.pinot.v2.writer;
import java.io.Serializable;
import java.util.List;
-public class PinotSinkWriterState implements Serializable {
+public class PinotWriterState implements Serializable {
private final List<String> serializedElements;
private final long minTimestamp;
private final long maxTimestamp;
- public PinotSinkWriterState(List<String> serializedElements, long minTimestamp, long maxTimestamp) {
+ public PinotWriterState(List<String> serializedElements, long minTimestamp, long maxTimestamp) {
this.serializedElements = serializedElements;
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkWriterStateSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterStateSerializer.java
similarity index 78%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkWriterStateSerializer.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterStateSerializer.java
index 6dc7efa..fe97bc9 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkWriterStateSerializer.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterStateSerializer.java
@@ -16,21 +16,20 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.pinot.serializer;
+package org.apache.flink.streaming.connectors.pinot.v2.writer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriterState;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
/**
- * Serializer for {@link PinotSinkWriterState}
+ * Serializer for {@link PinotWriterState}
*/
@Internal
-public class PinotSinkWriterStateSerializer implements SimpleVersionedSerializer<PinotSinkWriterState> {
+public class PinotWriterStateSerializer implements SimpleVersionedSerializer<PinotWriterState> {
private static final int CURRENT_VERSION = 1;
@@ -40,7 +39,7 @@ public class PinotSinkWriterStateSerializer implements SimpleVersionedSerializer
}
@Override
- public byte[] serialize(PinotSinkWriterState writerState) throws IOException {
+ public byte[] serialize(PinotWriterState writerState) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
out.writeLong(writerState.getMinTimestamp());
@@ -57,7 +56,7 @@ public class PinotSinkWriterStateSerializer implements SimpleVersionedSerializer
}
@Override
- public PinotSinkWriterState deserialize(int version, byte[] serialized) throws IllegalStateException, IOException {
+ public PinotWriterState deserialize(int version, byte[] serialized) throws IllegalStateException, IOException {
switch (version) {
case 1:
return deserializeV1(serialized);
@@ -66,7 +65,7 @@ public class PinotSinkWriterStateSerializer implements SimpleVersionedSerializer
}
}
- private PinotSinkWriterState deserializeV1(byte[] serialized) throws IOException {
+ private PinotWriterState deserializeV1(byte[] serialized) throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
long minTimestamp = in.readLong();
@@ -77,7 +76,7 @@ public class PinotSinkWriterStateSerializer implements SimpleVersionedSerializer
for (int i = 0; i < size; i++) {
serializedElements.add(in.readUTF());
}
- return new PinotSinkWriterState(serializedElements, minTimestamp, maxTimestamp);
+ return new PinotWriterState(serializedElements, minTimestamp, maxTimestamp);
}
}
}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotClusterContainer.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotClusterContainer.java
new file mode 100644
index 0000000..448c064
--- /dev/null
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotClusterContainer.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.shaded.com.google.common.net.HostAndPort;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import static java.lang.String.format;
+import static org.testcontainers.utility.DockerImageName.parse;
+
+@Testcontainers
+public class PinotClusterContainer implements Closeable {
+
+ private static final String PINOT_IMAGE_NAME = "apachepinot/pinot:0.6.0";
+ private static final String ZOOKEEPER_IMAGE_NAME = "zookeeper:latest";
+
+ private static final String ZOOKEEPER_INTERNAL_HOST = "zookeeper";
+
+ private static final int ZOOKEEPER_PORT = 2181;
+ private static final int CONTROLLER_PORT = 9000;
+ private static final int BROKER_PORT = 8099;
+ private static final int SERVER_ADMIN_PORT = 8097;
+ private static final int SERVER_PORT = 8098;
+ private static final int GRPC_PORT = 8090;
+
+ @Container
+ private final GenericContainer<?> controller;
+ @Container
+ private final GenericContainer<?> broker;
+ @Container
+ private final GenericContainer<?> server;
+ @Container
+ private final GenericContainer<?> zookeeper;
+
+ public PinotClusterContainer() {
+ Network network = Network.newNetwork();
+
+ zookeeper = new GenericContainer<>(parse(ZOOKEEPER_IMAGE_NAME))
+ .withStartupAttempts(3)
+ .withNetwork(network)
+ .withNetworkAliases(ZOOKEEPER_INTERNAL_HOST)
+ .withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(ZOOKEEPER_PORT))
+ .withExposedPorts(ZOOKEEPER_PORT);
+
+ controller = new GenericContainer<>(parse(PINOT_IMAGE_NAME))
+ .withStartupAttempts(3)
+ .withNetwork(network)
+ .withClasspathResourceMapping("/pinot-controller", "/var/pinot/controller/config", BindMode.READ_ONLY)
+ .withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-controller-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
+ .withCommand("StartController", "-configFileName", "/var/pinot/controller/config/pinot-controller.conf")
+ .withNetworkAliases("pinot-controller", "localhost")
+ .withExposedPorts(CONTROLLER_PORT);
+
+ broker = new GenericContainer<>(parse(PINOT_IMAGE_NAME))
+ .withStartupAttempts(3)
+ .withNetwork(network)
+ .withClasspathResourceMapping("/pinot-broker", "/var/pinot/broker/config", BindMode.READ_ONLY)
+ .withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-broker-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
+ .withCommand("StartBroker", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", "/var/pinot/broker/config/pinot-broker.conf")
+ .withNetworkAliases("pinot-broker", "localhost")
+ .withExposedPorts(BROKER_PORT);
+
+ server = new GenericContainer<>(parse(PINOT_IMAGE_NAME))
+ .withStartupAttempts(3)
+ .withNetwork(network)
+ .withClasspathResourceMapping("/pinot-server", "/var/pinot/server/config", BindMode.READ_ONLY)
+ .withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-server-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
+ .withCommand("StartServer", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", "/var/pinot/server/config/pinot-server.conf")
+ .withNetworkAliases("pinot-server", "localhost")
+ .withExposedPorts(SERVER_PORT, SERVER_ADMIN_PORT, GRPC_PORT);
+ }
+
+ public void start() {
+ zookeeper.start();
+ controller.start();
+ broker.start();
+ server.start();
+ }
+
+ public void stop() {
+ server.stop();
+ broker.stop();
+ controller.stop();
+ zookeeper.stop();
+ }
+
+ private String getZookeeperInternalHostPort() {
+ return format("%s:%s", ZOOKEEPER_INTERNAL_HOST, ZOOKEEPER_PORT);
+ }
+
+ public HostAndPort getControllerHostAndPort() {
+ return HostAndPort.fromParts(controller.getHost(), controller.getMappedPort(CONTROLLER_PORT));
+ }
+
+ public HostAndPort getBrokerHostAndPort() {
+ return HostAndPort.fromParts(broker.getHost(), broker.getMappedPort(BROKER_PORT));
+ }
+
+ public HostAndPort getServerHostAndPort() {
+ return HostAndPort.fromParts(server.getHost(), server.getMappedPort(SERVER_PORT));
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ stop();
+ }
+}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java
index 73a4403..aeec3dc 100644
--- a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java
@@ -156,7 +156,11 @@ public class PinotTestHelper implements Closeable {
return pinotResultSetGroup.getResultSet(0);
} finally {
if (brokerConnection != null) {
- brokerConnection.close();
+ try {
+ brokerConnection.close();
+ } catch (Exception e) {
+ LOG.warn(e.getLocalizedMessage());
+ }
}
}
}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSinkTest.java
similarity index 90%
rename from flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
rename to flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSinkTest.java
index 8649eeb..9ae5a76 100644
--- a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSinkTest.java
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,27 +16,27 @@
* limitations under the License.
*/
-
-package org.apache.flink.streaming.connectors.pinot;
+package org.apache.flink.streaming.connectors.pinot.v2;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.pinot.LocalFileSystemAdapter;
import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
-import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
-import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.flink.streaming.connectors.pinot.v2.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.v2.external.JsonSerializer;
import org.apache.pinot.client.PinotClientException;
import org.apache.pinot.client.ResultSet;
import org.junit.jupiter.api.Assertions;
@@ -54,10 +55,7 @@ import java.util.stream.IntStream;
import static org.apache.flink.util.Preconditions.checkArgument;
-/**
- * E2e tests for Pinot Sink using BATCH and STREAMING execution mode
- */
-public class PinotSinkTest extends PinotTestBase {
+class PinotSinkTest extends PinotTestBase {
private static final int MAX_ROWS_PER_SEGMENT = 5;
private static final long STREAMING_CHECKPOINTING_INTERVAL = 50;
@@ -157,7 +155,7 @@ public class PinotSinkTest extends PinotTestBase {
public void testFailureRecoveryInStreamingSink() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
- env.setParallelism(1);
+ env.setParallelism(2);
env.enableCheckpointing(STREAMING_CHECKPOINTING_INTERVAL);
List<String> rawData = getRawTestData(20);
@@ -193,7 +191,7 @@ public class PinotSinkTest extends PinotTestBase {
* @return resulting data stream
*/
private DataStream<SingleColumnTableRow> setupStreamingDataSource(StreamExecutionEnvironment env, List<String> rawDataValues) {
- StreamingSource source = new StreamingSource.Builder(rawDataValues, 10).build();
+ PinotSinkTest.StreamingSource source = new PinotSinkTest.StreamingSource.Builder(rawDataValues, 10).build();
return env.addSource(source)
.name("Test input");
}
@@ -207,7 +205,7 @@ public class PinotSinkTest extends PinotTestBase {
* @return resulting data stream
*/
private DataStream<SingleColumnTableRow> setupFailingStreamingDataSource(StreamExecutionEnvironment env, List<String> rawDataValues, int failOnceAtNthElement) {
- StreamingSource source = new StreamingSource.Builder(rawDataValues, 10)
+ PinotSinkTest.StreamingSource source = new PinotSinkTest.StreamingSource.Builder(rawDataValues, 10)
.raiseFailureOnce(failOnceAtNthElement)
.build();
return env.addSource(source)
@@ -257,9 +255,9 @@ public class PinotSinkTest extends PinotTestBase {
FileSystemAdapter fsAdapter = new LocalFileSystemAdapter(tempDirPrefix);
JsonSerializer<SingleColumnTableRow> jsonSerializer = new SingleColumnTableRowSerializer();
- EventTimeExtractor<SingleColumnTableRow> eventTimeExtractor = new SingleColumnTableRowEventTimeExtractor();
+ EventTimeExtractor<SingleColumnTableRow> eventTimeExtractor = new PinotSinkTest.SingleColumnTableRowEventTimeExtractor();
- PinotSink<SingleColumnTableRow> sink = new PinotSink.Builder<SingleColumnTableRow>(getPinotHost(), getPinotControllerPort(), getTableName())
+ PinotSink<SingleColumnTableRow> sink = new PinotSinkBuilder<SingleColumnTableRow>(getPinotHost(), getPinotControllerPort(), getTableName())
.withMaxRowsPerSegment(MAX_ROWS_PER_SEGMENT)
.withTempDirectoryPrefix(tempDirPrefix)
.withJsonSerializer(jsonSerializer)
@@ -405,9 +403,9 @@ public class PinotSinkTest extends PinotTestBase {
* to Pinot by then, as we require {@link #failOnceAtNthElement} to be greater than
* {@link #MAX_ROWS_PER_SEGMENT} (at a parallelism of 1). This allows to check whether the
* snapshot creation and failure recovery in
- * {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter} works properly,
+ * {@link org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriter} works properly,
* respecting the already committed elements and those that are stored in an active
- * {@link org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment}. Committed
+ * {@link org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriterSegment}. Committed
* elements must not be saved to the snapshot while those in an active segment must be saved
* to the snapshot in order to enable later-on recovery.
*
@@ -460,15 +458,15 @@ public class PinotSinkTest extends PinotTestBase {
this.sleepDurationMs = sleepDurationMs;
}
- public Builder raiseFailureOnce(int failOnceAtNthElement) {
+ public PinotSinkTest.StreamingSource.Builder raiseFailureOnce(int failOnceAtNthElement) {
checkArgument(failOnceAtNthElement > MAX_ROWS_PER_SEGMENT,
"failOnceAtNthElement (if set) is required to be larger than the number of elements per segment (MAX_ROWS_PER_SEGMENT).");
this.failOnceAtNthElement = failOnceAtNthElement;
return this;
}
- public StreamingSource build() {
- return new StreamingSource(rawDataValues, sleepDurationMs, failOnceAtNthElement);
+ public PinotSinkTest.StreamingSource build() {
+ return new PinotSinkTest.StreamingSource(rawDataValues, sleepDurationMs, failOnceAtNthElement);
}
}
}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/v2/PinotTestBase.java
similarity index 73%
rename from flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java
rename to flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/v2/PinotTestBase.java
index a5f5021..d9d7bf0 100644
--- a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/v2/PinotTestBase.java
@@ -16,11 +16,12 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.pinot;
+package org.apache.flink.streaming.connectors.pinot.v2;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.PinotClusterContainer;
+import org.apache.flink.streaming.connectors.pinot.PinotTestHelper;
+import org.apache.flink.streaming.connectors.pinot.v2.external.JsonSerializer;
import org.apache.flink.util.TestLogger;
import org.apache.pinot.spi.config.table.*;
import org.apache.pinot.spi.data.DimensionFieldSpec;
@@ -28,61 +29,29 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.utility.DockerImageName;
import java.io.IOException;
-import java.time.Duration;
/**
* Base class for PinotSink e2e tests
*/
-@Testcontainers
public class PinotTestBase extends TestLogger {
protected static final Logger LOG = LoggerFactory.getLogger(PinotTestBase.class);
- private static final String DOCKER_IMAGE_NAME = "apachepinot/pinot:0.6.0";
- private static final Integer PINOT_INTERNAL_BROKER_PORT = 8000;
- private static final Integer PINOT_INTERNAL_CONTROLLER_PORT = 9000;
-
+ private static final PinotClusterContainer pinotCluster = new PinotClusterContainer();
protected static TableConfig TABLE_CONFIG;
protected static final Schema TABLE_SCHEMA = PinotTableConfig.getTableSchema();
protected static PinotTestHelper pinotHelper;
- /**
- * Creates the Pinot testcontainer. We delay the start of tests until Pinot has started all
- * internal components. This is identified through a log statement.
- */
- @Container
- public static GenericContainer<?> pinot = new GenericContainer<>(DockerImageName.parse(DOCKER_IMAGE_NAME))
- .withCommand("QuickStart", "-type", "batch")
- .withExposedPorts(PINOT_INTERNAL_BROKER_PORT, PINOT_INTERNAL_CONTROLLER_PORT)
- .waitingFor(
- // Wait for controller, server and broker instances to be available
- new HttpWaitStrategy()
- .forPort(PINOT_INTERNAL_CONTROLLER_PORT)
- .forPath("/instances")
- .forStatusCode(200)
- .forResponsePredicate(res -> {
- try {
- JsonNode instances = JsonUtils.stringToJsonNode(res).get("instances");
- // Expect 3 instances to be up and running (controller, broker and server)
- return instances.size() == 3;
- } catch (IOException e) {
- LOG.error("Error while reading json response in HttpWaitStrategy.", e);
- }
- return false;
- })
- // Allow Pinot to take up to 180s for starting up
- .withStartupTimeout(Duration.ofSeconds(180))
- );
+ @BeforeAll
+ public static void setUpAll() {
+ pinotCluster.start();
+ }
/**
* Creates a new instance of the {@link PinotTestHelper} using the testcontainer port mappings
@@ -122,7 +91,7 @@ public class PinotTestBase extends TestLogger {
* @return Pinot container host
*/
protected String getPinotHost() {
- return pinot.getHost();
+ return pinotCluster.getControllerHostAndPort().getHostText();
}
@@ -132,7 +101,7 @@ public class PinotTestBase extends TestLogger {
* @return Pinot controller port
*/
protected String getPinotControllerPort() {
- return pinot.getMappedPort(PINOT_INTERNAL_CONTROLLER_PORT).toString();
+ return String.valueOf(pinotCluster.getControllerHostAndPort().getPort());
}
/**
@@ -141,7 +110,7 @@ public class PinotTestBase extends TestLogger {
* @return Pinot broker port
*/
private String getPinotBrokerPort() {
- return pinot.getMappedPort(PINOT_INTERNAL_BROKER_PORT).toString();
+ return String.valueOf(pinotCluster.getBrokerHostAndPort().getPort());
}
/**
@@ -175,6 +144,14 @@ public class PinotTestBase extends TestLogger {
public void setTimestamp(Long timestamp) {
this._timestamp = timestamp;
}
+
+ @Override
+ public String toString() {
+ return "SingleColumnTableRow{" +
+ "_col1='" + _col1 + '\'' +
+ ", _timestamp=" + _timestamp +
+ '}';
+ }
}
/**
diff --git a/flink-connector-pinot/src/test/resources/log4j.properties b/flink-connector-pinot/src/test/resources/log4j.properties
index 570d6f6..82631a4 100644
--- a/flink-connector-pinot/src/test/resources/log4j.properties
+++ b/flink-connector-pinot/src/test/resources/log4j.properties
@@ -25,3 +25,16 @@ log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+
+status=warn
+appender.console.type=Console
+appender.console.name=LogToConsole
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d{HH:mm:ss.SSS} - %highlight{%5p} %style{%logger{36}}{cyan} - %m%n%throwable
+
+### Logger Apache Flink ###
+logger.apacheFlink.name=org.apache.flink.streaming.connectors.pinot
+logger.apacheFlink.level=debug
+logger.apacheFlink.additivity=false
+logger.apacheFlink.appenderRef.console.ref=LogToConsole
diff --git a/flink-connector-pinot/src/test/resources/log4j2-test.properties b/flink-connector-pinot/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..330e112
--- /dev/null
+++ b/flink-connector-pinot/src/test/resources/log4j2-test.properties
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+status=WARN
+appender.console.type=Console
+appender.console.name=LogToConsole
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d{HH:mm:ss.SSS} - %highlight{%5p} %style{%logger{36}}{cyan} - %m%n%throwable
+### Logger test containers ###
+logger.testContainers.name=org.testcontainers
+logger.testContainers.level=WARN
+logger.testContainers.additivity=false
+logger.testContainers.appenderRef.console.ref=LogToConsole
+### Logger Docker Java ###
+logger.dockerJava.name=com.github.dockerjava
+logger.dockerJava.level=WARN
+logger.dockerJava.additivity=false
+logger.dockerJava.appenderRef.console.ref=LogToConsole
+### Logger Apache Flink ###
+logger.apacheFlink.name=org.apache.flink
+logger.apacheFlink.level=WARN
+logger.apacheFlink.additivity=false
+logger.apacheFlink.appenderRef.console.ref=LogToConsole
+### Logger Apache Streaming Connectors ###
+logger.streamingConnectors.name=org.apache.flink.streaming.connectors
+logger.streamingConnectors.level=WARN
+logger.streamingConnectors.additivity=false
+logger.streamingConnectors.appenderRef.console.ref=LogToConsole
+# Root Logger
+rootLogger.level=OFF
diff --git a/flink-connector-pinot/src/test/resources/log4j.properties b/flink-connector-pinot/src/test/resources/pinot-broker/pinot-broker.conf
similarity index 71%
copy from flink-connector-pinot/src/test/resources/log4j.properties
copy to flink-connector-pinot/src/test/resources/pinot-broker/pinot-broker.conf
index 570d6f6..3cd74a2 100644
--- a/flink-connector-pinot/src/test/resources/log4j.properties
+++ b/flink-connector-pinot/src/test/resources/pinot-broker/pinot-broker.conf
@@ -16,12 +16,6 @@
# limitations under the License.
################################################################################
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.out
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+pinot.broker.client.queryPort=8099
+pinot.broker.routing.table.builder.class=random
+pinot.set.instance.id.to.hostname=true
\ No newline at end of file
diff --git a/flink-connector-pinot/src/test/resources/log4j.properties b/flink-connector-pinot/src/test/resources/pinot-controller/pinot-controller.conf
similarity index 71%
copy from flink-connector-pinot/src/test/resources/log4j.properties
copy to flink-connector-pinot/src/test/resources/pinot-controller/pinot-controller.conf
index 570d6f6..d075e99 100644
--- a/flink-connector-pinot/src/test/resources/log4j.properties
+++ b/flink-connector-pinot/src/test/resources/pinot-controller/pinot-controller.conf
@@ -16,12 +16,10 @@
# limitations under the License.
################################################################################
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.out
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+controller.helix.cluster.name=pinot
+controller.host=pinot-controller
+controller.port=9000
+controller.data.dir=/var/pinot/controller/data/data
+controller.local.temp.dir=/var/pinot/controller/data
+controller.zk.str=zookeeper:2181
+pinot.set.instance.id.to.hostname=true
\ No newline at end of file
diff --git a/flink-connector-pinot/src/test/resources/log4j.properties b/flink-connector-pinot/src/test/resources/pinot-server/pinot-server.conf
similarity index 71%
copy from flink-connector-pinot/src/test/resources/log4j.properties
copy to flink-connector-pinot/src/test/resources/pinot-server/pinot-server.conf
index 570d6f6..219427a 100644
--- a/flink-connector-pinot/src/test/resources/log4j.properties
+++ b/flink-connector-pinot/src/test/resources/pinot-server/pinot-server.conf
@@ -16,12 +16,9 @@
# limitations under the License.
################################################################################
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.out
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+pinot.server.netty.port=8098
+pinot.server.adminapi.port=8097
+pinot.server.instance.dataDir=/var/pinot/server/data/index
+pinot.server.instance.segmentTarDir=/var/pinot/server/data/segment
+pinot.set.instance.id.to.hostname=true
+pinot.server.grpc.enable=true
\ No newline at end of file
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index 9b04ac8..374c288 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -60,11 +60,11 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
@@ -73,7 +73,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -85,7 +85,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <artifactId>flink-test-utils</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
index 2bbbcb7..ea5a722 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
@@ -27,7 +27,7 @@ import static org.apache.flink.util.NetUtils.getAvailablePort;
public abstract class RedisITCaseBase extends AbstractTestBase {
- public static final int REDIS_PORT = getAvailablePort();
+ public static final int REDIS_PORT = getAvailablePort().getPort();
public static final String REDIS_HOST = "127.0.0.1";
private static RedisServer redisServer;
diff --git a/flink-library-siddhi/pom.xml b/flink-library-siddhi/pom.xml
index 33dca44..3a7c86e 100644
--- a/flink-library-siddhi/pom.xml
+++ b/flink-library-siddhi/pom.xml
@@ -91,11 +91,11 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <artifactId>flink-test-utils</artifactId>
</dependency>
</dependencies>
diff --git a/pom.xml b/pom.xml
index 5d65561..8bfd3ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,7 +113,7 @@
<maven.compiler.target>${java.version}</maven.compiler.target>
<!-- Flink version -->
- <flink.version>1.14.6</flink.version>
+ <flink.version>1.15.3</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>
@@ -123,7 +123,7 @@
<mockito.version>1.10.19</mockito.version>
<scalacheck.version>1.14.0</scalacheck.version>
<scalatest.version>3.0.5</scalatest.version>
- <testcontainers.version>1.16.2</testcontainers.version>
+ <testcontainers.version>1.17.6</testcontainers.version>
<!-- JvmArg execution -->
<PermGen>64m</PermGen>
@@ -139,7 +139,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
@@ -177,12 +177,12 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
@@ -199,7 +199,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
@@ -216,7 +216,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>