You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/12 21:43:21 UTC
[1/5] flink git commit: [FLINK-6561] Disable glob test on Windows
Repository: flink
Updated Branches:
refs/heads/master ef751b2a1 -> 7173774d0
[FLINK-6561] Disable glob test on Windows
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7173774d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7173774d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7173774d
Branch: refs/heads/master
Commit: 7173774d0cd2b9d138587759d324e6633c24b6bd
Parents: 2201fab
Author: zentol <ch...@apache.org>
Authored: Thu May 11 21:24:02 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 12 20:20:53 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/api/common/io/GlobFilePathFilterTest.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7173774d/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
index c9f8da4..bcdb5c3 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
@@ -19,6 +19,8 @@ package org.apache.flink.api.common.io;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.junit.Assume;
import org.junit.Test;
import java.io.IOException;
@@ -110,6 +112,8 @@ public class GlobFilePathFilterTest {
@Test
public void testExcludeFilenameWithStart() {
+ Assume.assumeTrue("Windows does not allow asterisks in file names.", !OperatingSystem.isWindows());
+
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("**"),
Collections.singletonList("\\*"));
[3/5] flink git commit: [FLINK-5101] Track pending records in
CassandraSinkBase
Posted by ch...@apache.org.
[FLINK-5101] Track pending records in CassandraSinkBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/948bb9f6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/948bb9f6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/948bb9f6
Branch: refs/heads/master
Commit: 948bb9f674a9c4cf95491f3d9a92b38eed6b64e8
Parents: ef751b2
Author: zentol <ch...@apache.org>
Authored: Wed Nov 23 16:59:22 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri May 12 20:20:53 2017 +0200
----------------------------------------------------------------------
.../connectors/cassandra/CassandraPojoSink.java | 7 +--
.../connectors/cassandra/CassandraSinkBase.java | 56 +++++++++++++++-----
2 files changed, 47 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/948bb9f6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index 650c481..9cfb2f8 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.cassandra;
+import com.datastax.driver.core.ResultSet;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.google.common.util.concurrent.ListenableFuture;
@@ -31,7 +32,7 @@ import org.apache.flink.configuration.Configuration;
*
* @param <IN> Type of the elements emitted by this sink
*/
-public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
+public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
private static final long serialVersionUID = 1L;
@@ -61,7 +62,7 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
}
@Override
- public ListenableFuture<Void> send(IN value) {
- return mapper.saveAsync(value);
+ public ListenableFuture<ResultSet> send(IN value) {
+ return session.executeAsync(mapper.saveQuery(value));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/948bb9f6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 49b1efa..b281525 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}.
@@ -40,11 +41,13 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
protected transient Cluster cluster;
protected transient Session session;
- protected transient Throwable exception = null;
+ protected transient volatile Throwable exception;
protected transient FutureCallback<V> callback;
private final ClusterBuilder builder;
+ private final AtomicInteger updatesPending = new AtomicInteger();
+
protected CassandraSinkBase(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
@@ -55,11 +58,24 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
this.callback = new FutureCallback<V>() {
@Override
public void onSuccess(V ignored) {
+ int pending = updatesPending.decrementAndGet();
+ if (pending == 0) {
+ synchronized (updatesPending) {
+ updatesPending.notifyAll();
+ }
+ }
}
@Override
public void onFailure(Throwable t) {
+ int pending = updatesPending.decrementAndGet();
+ if (pending == 0) {
+ synchronized (updatesPending) {
+ updatesPending.notifyAll();
+ }
+ }
exception = t;
+
LOG.error("Error while sending value.", t);
}
};
@@ -70,29 +86,43 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
@Override
public void invoke(IN value) throws Exception {
if (exception != null) {
- throw new IOException("invoke() failed", exception);
+ throw new IOException("Error while sending value.", exception);
}
ListenableFuture<V> result = send(value);
+ updatesPending.incrementAndGet();
Futures.addCallback(result, callback);
}
public abstract ListenableFuture<V> send(IN value);
@Override
- public void close() {
+ public void close() throws Exception {
try {
- if (session != null) {
- session.close();
+ if (exception != null) {
+ throw new IOException("Error while sending value.", exception);
}
- } catch (Exception e) {
- LOG.error("Error while closing session.", e);
- }
- try {
- if (cluster != null) {
- cluster.close();
+
+ while (updatesPending.get() > 0) {
+ synchronized (updatesPending) {
+ updatesPending.wait();
+ }
+ }
+
+ } finally {
+ try {
+ if (session != null) {
+ session.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ if (cluster != null) {
+ cluster.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
}
- } catch (Exception e) {
- LOG.error("Error while closing cluster.", e);
}
}
}
[2/5] flink git commit: [FLINK-6548] Fix
AvroOutputFormatTest#testCompression on Windows
Posted by ch...@apache.org.
[FLINK-6548] Fix AvroOutputFormatTest#testCompression on Windows
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ebebdfc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ebebdfc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ebebdfc
Branch: refs/heads/master
Commit: 2ebebdfc7561a36d76d9c3f6c44617483db45a44
Parents: 0be04b4
Author: zentol <ch...@apache.org>
Authored: Thu May 11 16:39:12 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 12 20:20:53 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/api/java/io/AvroOutputFormatTest.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2ebebdfc/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
index 4d6c6b7..f843d3b 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
@@ -28,8 +28,6 @@ import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import org.apache.avro.Schema;
import org.apache.flink.api.io.avro.example.User;
@@ -135,12 +133,13 @@ public class AvroOutputFormatTest {
assertTrue(fileSize(outputPath) > fileSize(compressedOutputPath));
// cleanup
- Files.delete(Paths.get(outputPath.getPath()));
- Files.delete(Paths.get(compressedOutputPath.getPath()));
+ FileSystem fs = FileSystem.getLocalFileSystem();
+ fs.delete(outputPath, false);
+ fs.delete(compressedOutputPath, false);
}
private long fileSize(Path path) throws IOException {
- return Files.size(Paths.get(path.getPath()));
+ return path.getFileSystem().getFileStatus(path).getLen();
}
private void output(final AvroOutputFormat<User> outputFormat) throws IOException {
[4/5] flink git commit: [FLINK-6558] Disable yarn tests on Windows
Posted by ch...@apache.org.
[FLINK-6558] Disable yarn tests on Windows
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2201fab1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2201fab1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2201fab1
Branch: refs/heads/master
Commit: 2201fab1c740b8c3aa6d14ecf0471adf77f5b761
Parents: 2ebebdf
Author: zentol <ch...@apache.org>
Authored: Thu May 11 16:55:11 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 12 20:20:53 2017 +0200
----------------------------------------------------------------------
.../apache/flink/yarn/YarnApplicationMasterRunnerTest.java | 8 ++++++++
.../highavailability/YarnIntraNonHaMasterServicesTest.java | 4 ++++
.../YarnPreConfiguredMasterHaServicesTest.java | 4 ++++
3 files changed, 16 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2201fab1/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
index f874896..4884dd0 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
@@ -21,8 +21,11 @@ package org.apache.flink.yarn;
import com.google.common.collect.ImmutableMap;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.util.OperatingSystem;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assume;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -49,6 +52,11 @@ public class YarnApplicationMasterRunnerTest {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
+ @BeforeClass
+ public static void checkOS() {
+ Assume.assumeTrue(!OperatingSystem.isWindows());
+ }
+
@Test
public void testCreateTaskExecutorContext() throws Exception {
File root = folder.getRoot();
http://git-wip-us.apache.org/repos/asf/flink/blob/2201fab1/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
index 64c22d2..b4d2ba8 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
@@ -24,12 +24,14 @@ import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -70,6 +72,8 @@ public class YarnIntraNonHaMasterServicesTest extends TestLogger {
@BeforeClass
public static void createHDFS() throws Exception {
+ Assume.assumeTrue(!OperatingSystem.isWindows());
+
final File tempDir = TEMP_DIR.newFolder();
org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
http://git-wip-us.apache.org/repos/asf/flink/blob/2201fab1/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
index 83ec37d..07c952a 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
@@ -24,12 +24,14 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -59,6 +61,8 @@ public class YarnPreConfiguredMasterHaServicesTest extends TestLogger {
@BeforeClass
public static void createHDFS() throws Exception {
+ Assume.assumeTrue(!OperatingSystem.isWindows());
+
final File tempDir = TEMP_DIR.newFolder();
org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
[5/5] flink git commit: [FLINK-5101] Refactor CassandraConnectorITCase
Posted by ch...@apache.org.
[FLINK-5101] Refactor CassandraConnectorITCase
This closes #2866.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0be04b45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0be04b45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0be04b45
Branch: refs/heads/master
Commit: 0be04b454f58d5575bd6fab5755aa1264f363b91
Parents: 948bb9f
Author: zentol <ch...@apache.org>
Authored: Wed May 10 12:08:11 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 12 20:20:53 2017 +0200
----------------------------------------------------------------------
.../cassandra/CassandraConnectorITCase.java | 213 ++++++++-----------
1 file changed, 89 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0be04b45/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index f2e8f8b..06f3c35 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -28,39 +28,24 @@ import com.datastax.driver.core.Session;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.TestEnvironment;
-import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +56,8 @@ import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
+import java.util.Random;
import java.util.Scanner;
import java.util.UUID;
@@ -100,12 +87,15 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
private static Cluster cluster;
private static Session session;
+ private static final String TABLE_NAME_PREFIX = "flink_";
+ private static final String TABLE_NAME_VARIABLE = "$TABLE";
private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
- private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;";
- private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);";
- private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
- private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)";
- private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;";
+ private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink." + TABLE_NAME_VARIABLE + " (id text PRIMARY KEY, counter int, batch_id int);";
+ private static final String INSERT_DATA_QUERY = "INSERT INTO flink." + TABLE_NAME_VARIABLE + " (id, counter, batch_id) VALUES (?, ?, ?)";
+ private static final String SELECT_DATA_QUERY = "SELECT * FROM flink." + TABLE_NAME_VARIABLE + ';';
+
+ private static final Random random = new Random();
+ private int tableID;
private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20);
@@ -129,12 +119,6 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
}
}
- private static LocalFlinkMiniCluster flinkCluster;
-
- // ------------------------------------------------------------------------
- // Cluster Setup (Cassandra & Flink)
- // ------------------------------------------------------------------------
-
@BeforeClass
public static void startCassandra() throws IOException {
@@ -173,39 +157,39 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
cassandra.start();
}
- try {
- Thread.sleep(1000 * 10);
- } catch (InterruptedException e) { //give cassandra a few seconds to start up
+ // start establishing a connection within 30 seconds
+ long start = System.nanoTime();
+ long deadline = start + 30_000_000_000L;
+ while (true) {
+ try {
+ cluster = builder.getCluster();
+ session = cluster.connect();
+ break;
+ } catch (Exception e) {
+ if (System.nanoTime() > deadline) {
+ throw e;
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ignored) {
+ }
+ }
}
-
- cluster = builder.getCluster();
- session = cluster.connect();
+ LOG.debug("Connection established after {}ms.", System.currentTimeMillis() - start);
session.execute(CREATE_KEYSPACE_QUERY);
- session.execute(CREATE_TABLE_QUERY);
- }
-
- @BeforeClass
- public static void startFlink() throws Exception {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
- flinkCluster = new LocalFlinkMiniCluster(config);
- flinkCluster.start();
+ session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial"));
}
- @AfterClass
- public static void stopFlink() {
- if (flinkCluster != null) {
- flinkCluster.stop();
- flinkCluster = null;
- }
+ @Before
+ public void createTable() {
+ tableID = random.nextInt(Integer.MAX_VALUE);
+ session.execute(injectTableName(CREATE_TABLE_QUERY));
}
@AfterClass
public static void closeCassandra() {
if (session != null) {
- session.executeAsync(DROP_KEYSPACE_QUERY);
session.close();
}
@@ -224,28 +208,13 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
}
// ------------------------------------------------------------------------
- // Test preparation & cleanup
- // ------------------------------------------------------------------------
-
- @Before
- public void initializeExecutionEnvironment() {
- TestStreamEnvironment.setAsContext(flinkCluster, 4);
- new TestEnvironment(flinkCluster, 4, false).setAsContext();
- }
-
- @After
- public void deleteSchema() throws Exception {
- session.executeAsync(CLEAR_TABLE_QUERY);
- }
-
- // ------------------------------------------------------------------------
// Exactly-once Tests
// ------------------------------------------------------------------------
@Override
protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception {
return new CassandraTupleWriteAheadSink<>(
- INSERT_DATA_QUERY,
+ injectTableName(INSERT_DATA_QUERY),
TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
builder,
new CassandraCommitter(builder));
@@ -264,7 +233,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
@Override
protected void verifyResultsIdealCircumstances(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
- ResultSet result = session.execute(SELECT_DATA_QUERY);
+ ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
ArrayList<Integer> list = new ArrayList<>();
for (int x = 1; x <= 60; x++) {
list.add(x);
@@ -279,7 +248,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
@Override
protected void verifyResultsDataPersistenceUponMissedNotify(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
- ResultSet result = session.execute(SELECT_DATA_QUERY);
+ ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
ArrayList<Integer> list = new ArrayList<>();
for (int x = 1; x <= 60; x++) {
list.add(x);
@@ -294,7 +263,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
@Override
protected void verifyResultsDataDiscardingUponRestore(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
- ResultSet result = session.execute(SELECT_DATA_QUERY);
+ ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
ArrayList<Integer> list = new ArrayList<>();
for (int x = 1; x <= 20; x++) {
list.add(x);
@@ -324,7 +293,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
}
ArrayList<Integer> actual = new ArrayList<>();
- ResultSet result = session.execute(SELECT_DATA_QUERY);
+ ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
for (Row s : result) {
actual.add(s.getInt("counter"));
}
@@ -335,16 +304,17 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
@Test
public void testCassandraCommitter() throws Exception {
- CassandraCommitter cc1 = new CassandraCommitter(builder);
- cc1.setJobId("job");
+ String jobID = new JobID().toString();
+ CassandraCommitter cc1 = new CassandraCommitter(builder, "flink_auxiliary_cc");
+ cc1.setJobId(jobID);
cc1.setOperatorId("operator");
- CassandraCommitter cc2 = new CassandraCommitter(builder);
- cc2.setJobId("job");
+ CassandraCommitter cc2 = new CassandraCommitter(builder, "flink_auxiliary_cc");
+ cc2.setJobId(jobID);
cc2.setOperatorId("operator");
- CassandraCommitter cc3 = new CassandraCommitter(builder);
- cc3.setJobId("job");
+ CassandraCommitter cc3 = new CassandraCommitter(builder, "flink_auxiliary_cc");
+ cc3.setJobId(jobID);
cc3.setOperatorId("operator1");
cc1.createResource();
@@ -370,8 +340,8 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
cc2.close();
cc3.close();
- cc1 = new CassandraCommitter(builder);
- cc1.setJobId("job");
+ cc1 = new CassandraCommitter(builder, "flink_auxiliary_cc");
+ cc1.setJobId(jobID);
cc1.setOperatorId("operator");
cc1.open();
@@ -389,70 +359,65 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
@Test
public void testCassandraTupleAtLeastOnceSink() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ CassandraTupleSink<Tuple3<String, Integer, Integer>> sink = new CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY), builder);
- DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection);
- source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
+ sink.open(new Configuration());
+
+ for (Tuple3<String, Integer, Integer> value : collection) {
+ sink.send(value);
+ }
- env.execute();
+ sink.close();
- ResultSet rs = session.execute(SELECT_DATA_QUERY);
+ ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
Assert.assertEquals(20, rs.all().size());
}
@Test
public void testCassandraPojoAtLeastOnceSink() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStreamSource<Pojo> source = env
- .addSource(new SourceFunction<Pojo>() {
-
- private boolean running = true;
- private volatile int cnt = 0;
-
- @Override
- public void run(SourceContext<Pojo> ctx) throws Exception {
- while (running) {
- ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0));
- cnt++;
- if (cnt == 20) {
- cancel();
- }
- }
- }
+ session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
- @Override
- public void cancel() {
- running = false;
- }
- });
+ CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, builder);
- source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
+ sink.open(new Configuration());
- env.execute();
+ for (int x = 0; x < 20; x++) {
+ sink.send(new Pojo(UUID.randomUUID().toString(), x, 0));
+ }
- ResultSet rs = session.execute(SELECT_DATA_QUERY);
+ sink.close();
+
+ ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
Assert.assertEquals(20, rs.all().size());
}
@Test
public void testCassandraBatchFormats() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ OutputFormat<Tuple3<String, Integer, Integer>> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
+ sink.configure(new Configuration());
+ sink.open(0, 1);
- DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection);
- dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
+ for (Tuple3<String, Integer, Integer> value : collection) {
+ sink.writeRecord(value);
+ }
- env.execute("Write data");
+ sink.close();
- DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
- new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
- TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
+ InputFormat<Tuple3<String, Integer, Integer>, InputSplit> source = new CassandraInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder);
+ source.configure(new Configuration());
+ source.open(null);
+ List<Tuple3<String, Integer, Integer>> result = new ArrayList<>();
+
+ while (!source.reachedEnd()) {
+ result.add(source.nextRecord(new Tuple3<String, Integer, Integer>()));
+ }
+
+ source.close();
+ Assert.assertEquals(20, result.size());
+ }
- long count = inputDS.count();
- Assert.assertEquals(count, 20L);
+ private String injectTableName(String target) {
+ return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID);
}
}