You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/31 17:02:33 UTC
[2/2] flink git commit: [hotfix] [cassandra connector] Fix minor
issues in CassandraConnectorTest.
[hotfix] [cassandra connector] Fix minor issues in CassandraConnectorTest.
The test now properly uses and reuses a mini cluster, rather than spawning a local environment for each test.
This also properly renames the CassandraConnectorTest to CassandraConnectorITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cd9bb5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cd9bb5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cd9bb5f
Branch: refs/heads/master
Commit: 7cd9bb5f1e09ad2fdbe2b7872f92432dcfbad374
Parents: 97a83a1
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 31 16:54:08 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 31 18:44:29 2016 +0200
----------------------------------------------------------------------
.../cassandra/CassandraCommitter.java | 13 +-
.../connectors/cassandra/CassandraPojoSink.java | 12 +-
.../cassandra/CassandraConnectorITCase.java | 460 +++++++++++++++++++
.../cassandra/CassandraConnectorTest.java | 426 -----------------
.../operators/WriteAheadSinkTestBase.java | 8 +-
5 files changed, 479 insertions(+), 440 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
index 5dceb60..e83b1be 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
@@ -26,11 +26,14 @@ import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
/**
* CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
* database.
- * <p/>
- * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
+ *
+ * <p>Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
*/
public class CassandraCommitter extends CheckpointCommitter {
- private ClusterBuilder builder;
+
+ private static final long serialVersionUID = 1L;
+
+ private final ClusterBuilder builder;
private transient Cluster cluster;
private transient Session session;
@@ -54,9 +57,6 @@ public class CassandraCommitter extends CheckpointCommitter {
/**
* Internally used to set the job ID after instantiation.
- *
- * @param id
- * @throws Exception
*/
public void setJobId(String id) throws Exception {
super.setJobId(id);
@@ -66,7 +66,6 @@ public class CassandraCommitter extends CheckpointCommitter {
/**
* Generates the necessary tables to store information.
*
- * @return
* @throws Exception
*/
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index 204a0f3..650c481 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -23,13 +23,19 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.flink.configuration.Configuration;
/**
- * Flink Sink to save data into a Cassandra cluster using {@link Mapper}, which
- * it uses annotations from {@link com.datastax.driver.mapping}.
+ * Flink Sink to save data into a Cassandra cluster using
+ * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>,
+ * which it uses annotations from
+ * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html">
+ * com.datastax.driver.mapping.annotations</a>.
*
* @param <IN> Type of the elements emitted by this sink
*/
public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
- protected Class<IN> clazz;
+
+ private static final long serialVersionUID = 1L;
+
+ protected final Class<IN> clazz;
protected transient Mapper<IN> mapper;
protected transient MappingManager mappingManager;
http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
new file mode 100644
index 0000000..9388818
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+
+import org.apache.cassandra.service.CassandraDaemon;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.flink.test.util.TestEnvironment;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.internal.AssumptionViolatedException;
+import org.junit.runner.RunWith;
+
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Scanner;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+@PowerMockIgnore("javax.management.*")
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
+ private static File tmpDir;
+
+ private static final boolean EMBEDDED = true;
+
+ private static EmbeddedCassandraService cassandra;
+
+ private static ClusterBuilder builder = new ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Cluster.Builder builder) {
+ return builder
+ .addContactPoint("127.0.0.1")
+ .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
+ .withoutJMXReporting()
+ .withoutMetrics().build();
+ }
+ };
+
+ private static Cluster cluster;
+ private static Session session;
+
+ private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+ private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;";
+ private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);";
+ private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
+ private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)";
+ private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;";
+
+ private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20);
+
+ static {
+ for (int i = 0; i < 20; i++) {
+ collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
+ }
+ }
+
+ private static class EmbeddedCassandraService {
+ CassandraDaemon cassandraDaemon;
+
+ public void start() throws IOException {
+ this.cassandraDaemon = new CassandraDaemon();
+ this.cassandraDaemon.init(null);
+ this.cassandraDaemon.start();
+ }
+
+ public void stop() {
+ this.cassandraDaemon.stop();
+ }
+ }
+
+ private static ForkableFlinkMiniCluster flinkCluster;
+
+ // ------------------------------------------------------------------------
+ // Cluster Setup (Cassandra & Flink)
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void startCassandra() throws IOException {
+
+ // check if we should run this test, current Cassandra version requires Java >= 1.8
+ try {
+ String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
+ float javaVersion = Float.parseFloat(javaVersionString);
+ Assume.assumeTrue(javaVersion >= 1.8f);
+ }
+ catch (AssumptionViolatedException e) {
+ System.out.println("Skipping CassandraConnectorITCase, because the JDK is < Java 8+");
+ throw e;
+ }
+ catch (Exception e) {
+ LOG.error("Cannot determine Java version", e);
+ e.printStackTrace();
+ fail("Cannot determine Java version");
+ }
+
+ // generate temporary files
+ tmpDir = CommonTestUtils.createTempDirectory();
+ ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader();
+ File file = new File(classLoader.getResource("cassandra.yaml").getFile());
+ File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
+
+ assertTrue(tmp.createNewFile());
+ BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
+
+ //copy cassandra.yaml; inject absolute paths into cassandra.yaml
+ Scanner scanner = new Scanner(file);
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ line = line.replace("$PATH", "'" + tmp.getParentFile());
+ b.write(line + "\n");
+ b.flush();
+ }
+ scanner.close();
+
+
+ // Tell cassandra where the configuration files are.
+ // Use the test configuration file.
+ System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
+
+ if (EMBEDDED) {
+ cassandra = new EmbeddedCassandraService();
+ cassandra.start();
+ }
+
+ try {
+ Thread.sleep(1000 * 10);
+ } catch (InterruptedException e) { //give cassandra a few seconds to start up
+ }
+
+ cluster = builder.getCluster();
+ session = cluster.connect();
+
+ session.execute(CREATE_KEYSPACE_QUERY);
+ session.execute(CREATE_TABLE_QUERY);
+ }
+
+ @BeforeClass
+ public static void startFlink() throws Exception {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+
+ flinkCluster = new ForkableFlinkMiniCluster(config);
+ flinkCluster.start();
+ }
+
+ @AfterClass
+ public static void stopFlink() {
+ flinkCluster.stop();
+ }
+
+ @AfterClass
+ public static void closeCassandra() {
+ if (session != null) {
+ session.executeAsync(DROP_KEYSPACE_QUERY);
+ session.close();
+ }
+
+ if (cluster != null) {
+ cluster.close();
+ }
+
+ if (cassandra != null) {
+ cassandra.stop();
+ }
+
+ if (tmpDir != null) {
+ //noinspection ResultOfMethodCallIgnored
+ tmpDir.delete();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Test preparation & cleanup
+ // ------------------------------------------------------------------------
+
+ @Before
+ public void initializeExecutionEnvironment() {
+ TestStreamEnvironment.setAsContext(flinkCluster, 4);
+ new TestEnvironment(flinkCluster, 4, false).setAsContext();
+ }
+
+ @After
+ public void deleteSchema() throws Exception {
+ session.executeAsync(CLEAR_TABLE_QUERY);
+ }
+
+ // ------------------------------------------------------------------------
+ // Exactly-once Tests
+ // ------------------------------------------------------------------------
+
+ @Override
+ protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception {
+ return new CassandraTupleWriteAheadSink<>(
+ INSERT_DATA_QUERY,
+ TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
+ builder,
+ new CassandraCommitter(builder));
+ }
+
+ @Override
+ protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() {
+ return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class);
+ }
+
+ @Override
+ protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
+ return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
+ }
+
+ @Override
+ protected void verifyResultsIdealCircumstances(
+ OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
+ OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
+ CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
+
+ ResultSet result = session.execute(SELECT_DATA_QUERY);
+ ArrayList<Integer> list = new ArrayList<>();
+ for (int x = 1; x <= 60; x++) {
+ list.add(x);
+ }
+
+ for (Row s : result) {
+ list.remove(new Integer(s.getInt("counter")));
+ }
+ Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
+ }
+
+ @Override
+ protected void verifyResultsDataPersistenceUponMissedNotify(
+ OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
+ OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
+ CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
+
+ ResultSet result = session.execute(SELECT_DATA_QUERY);
+ ArrayList<Integer> list = new ArrayList<>();
+ for (int x = 1; x <= 60; x++) {
+ list.add(x);
+ }
+
+ for (Row s : result) {
+ list.remove(new Integer(s.getInt("counter")));
+ }
+ Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
+ }
+
+ @Override
+ protected void verifyResultsDataDiscardingUponRestore(
+ OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
+ OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
+ CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
+
+ ResultSet result = session.execute(SELECT_DATA_QUERY);
+ ArrayList<Integer> list = new ArrayList<>();
+ for (int x = 1; x <= 20; x++) {
+ list.add(x);
+ }
+ for (int x = 41; x <= 60; x++) {
+ list.add(x);
+ }
+
+ for (Row s : result) {
+ list.remove(new Integer(s.getInt("counter")));
+ }
+ Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
+ }
+
+ @Test
+ public void testCassandraCommitter() throws Exception {
+ CassandraCommitter cc1 = new CassandraCommitter(builder);
+ cc1.setJobId("job");
+ cc1.setOperatorId("operator");
+ cc1.setOperatorSubtaskId(0);
+
+ CassandraCommitter cc2 = new CassandraCommitter(builder);
+ cc2.setJobId("job");
+ cc2.setOperatorId("operator");
+ cc2.setOperatorSubtaskId(1);
+
+ CassandraCommitter cc3 = new CassandraCommitter(builder);
+ cc3.setJobId("job");
+ cc3.setOperatorId("operator1");
+ cc3.setOperatorSubtaskId(0);
+
+ cc1.createResource();
+
+ cc1.open();
+ cc2.open();
+ cc3.open();
+
+ Assert.assertFalse(cc1.isCheckpointCommitted(1));
+ Assert.assertFalse(cc2.isCheckpointCommitted(1));
+ Assert.assertFalse(cc3.isCheckpointCommitted(1));
+
+ cc1.commitCheckpoint(1);
+ Assert.assertTrue(cc1.isCheckpointCommitted(1));
+ //verify that other sub-tasks aren't affected
+ Assert.assertFalse(cc2.isCheckpointCommitted(1));
+ //verify that other tasks aren't affected
+ Assert.assertFalse(cc3.isCheckpointCommitted(1));
+
+ Assert.assertFalse(cc1.isCheckpointCommitted(2));
+
+ cc1.close();
+ cc2.close();
+ cc3.close();
+
+ cc1 = new CassandraCommitter(builder);
+ cc1.setJobId("job");
+ cc1.setOperatorId("operator");
+ cc1.setOperatorSubtaskId(0);
+
+ cc1.open();
+
+ //verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data
+ Assert.assertTrue(cc1.isCheckpointCommitted(1));
+ Assert.assertFalse(cc1.isCheckpointCommitted(2));
+
+ cc1.close();
+ }
+
+ // ------------------------------------------------------------------------
+ // At-least-once Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testCassandraTupleAtLeastOnceSink() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection);
+ source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
+
+ env.execute();
+
+ ResultSet rs = session.execute(SELECT_DATA_QUERY);
+ Assert.assertEquals(20, rs.all().size());
+ }
+
+ @Test
+ public void testCassandraPojoAtLeastOnceSink() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStreamSource<Pojo> source = env
+ .addSource(new SourceFunction<Pojo>() {
+
+ private boolean running = true;
+ private volatile int cnt = 0;
+
+ @Override
+ public void run(SourceContext<Pojo> ctx) throws Exception {
+ while (running) {
+ ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0));
+ cnt++;
+ if (cnt == 20) {
+ cancel();
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
+
+ env.execute();
+
+ ResultSet rs = session.execute(SELECT_DATA_QUERY);
+ Assert.assertEquals(20, rs.all().size());
+ }
+
+ @Test
+ public void testCassandraBatchFormats() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection);
+ dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
+
+ env.execute("Write data");
+
+ DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
+ new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
+ TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
+
+
+ long count = inputDS.count();
+ Assert.assertEquals(count, 20L);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
deleted file mode 100644
index 2018255..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.QueryOptions;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-
-import org.apache.cassandra.service.CassandraDaemon;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
-import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.internal.AssumptionViolatedException;
-import org.junit.runner.RunWith;
-
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Scanner;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
-public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
- private static File tmpDir;
-
- private static final boolean EMBEDDED = true;
-
- private static EmbeddedCassandraService cassandra;
-
- private static ClusterBuilder builder = new ClusterBuilder() {
- @Override
- protected Cluster buildCluster(Cluster.Builder builder) {
- return builder
- .addContactPoint("127.0.0.1")
- .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
- .withoutJMXReporting()
- .withoutMetrics().build();
- }
- };
-
- private static Cluster cluster;
- private static Session session;
-
- private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
- private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;";
- private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);";
- private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
- private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)";
- private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;";
-
- private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20);
-
- static {
- for (int i = 0; i < 20; i++) {
- collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
- }
- }
-
- private static class EmbeddedCassandraService {
- CassandraDaemon cassandraDaemon;
-
- public void start() throws IOException {
- this.cassandraDaemon = new CassandraDaemon();
- this.cassandraDaemon.init(null);
- this.cassandraDaemon.start();
- }
-
- public void stop() {
- this.cassandraDaemon.stop();
- }
- }
-
- // ------------------------------------------------------------------------
- // Cassandra Cluster Setup
- // ------------------------------------------------------------------------
-
- @BeforeClass
- public static void startCassandra() throws IOException {
-
- // check if we should run this test, current Cassandra version requires Java >= 1.8
- try {
- String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
- float javaVersion = Float.parseFloat(javaVersionString);
- Assume.assumeTrue(javaVersion >= 1.8f);
- }
- catch (AssumptionViolatedException e) {
- System.out.println("Skipping CassandraConnectorTest, because the JDK is < Java 8+");
- throw e;
- }
- catch (Exception e) {
- LOG.error("Cannot determine Java version", e);
- e.printStackTrace();
- fail("Cannot determine Java version");
- }
-
- // generate temporary files
- tmpDir = CommonTestUtils.createTempDirectory();
- ClassLoader classLoader = CassandraConnectorTest.class.getClassLoader();
- File file = new File(classLoader.getResource("cassandra.yaml").getFile());
- File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
-
- assertTrue(tmp.createNewFile());
- BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
-
- //copy cassandra.yaml; inject absolute paths into cassandra.yaml
- Scanner scanner = new Scanner(file);
- while (scanner.hasNextLine()) {
- String line = scanner.nextLine();
- line = line.replace("$PATH", "'" + tmp.getParentFile());
- b.write(line + "\n");
- b.flush();
- }
- scanner.close();
-
-
- // Tell cassandra where the configuration files are.
- // Use the test configuration file.
- System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
-
- if (EMBEDDED) {
- cassandra = new EmbeddedCassandraService();
- cassandra.start();
- }
-
- try {
- Thread.sleep(1000 * 10);
- } catch (InterruptedException e) { //give cassandra a few seconds to start up
- }
-
- cluster = builder.getCluster();
- session = cluster.connect();
-
- session.execute(CREATE_KEYSPACE_QUERY);
- session.execute(CREATE_TABLE_QUERY);
- }
-
- @Before
- public void checkIfIgnore() {
-
- }
-
- @After
- public void deleteSchema() throws Exception {
- session.executeAsync(CLEAR_TABLE_QUERY);
- }
-
- @AfterClass
- public static void closeCassandra() {
- if (session != null) {
- session.executeAsync(DROP_KEYSPACE_QUERY);
- session.close();
- }
-
- if (cluster != null) {
- cluster.close();
- }
-
- if (cassandra != null) {
- cassandra.stop();
- }
-
- if (tmpDir != null) {
- //noinspection ResultOfMethodCallIgnored
- tmpDir.delete();
- }
- }
-
- //=====Exactly-Once=================================================================================================
- @Override
- protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception {
- return new CassandraTupleWriteAheadSink<>(
- INSERT_DATA_QUERY,
- TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
- builder,
- new CassandraCommitter(builder));
- }
-
- @Override
- protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() {
- return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class);
- }
-
- @Override
- protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
- return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
- }
-
- @Override
- protected void verifyResultsIdealCircumstances(
- OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
- OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
- CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-
- ResultSet result = session.execute(SELECT_DATA_QUERY);
- ArrayList<Integer> list = new ArrayList<>();
- for (int x = 1; x <= 60; x++) {
- list.add(x);
- }
-
- for (Row s : result) {
- list.remove(new Integer(s.getInt("counter")));
- }
- Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
- }
-
- @Override
- protected void verifyResultsDataPersistenceUponMissedNotify(
- OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
- OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
- CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-
- ResultSet result = session.execute(SELECT_DATA_QUERY);
- ArrayList<Integer> list = new ArrayList<>();
- for (int x = 1; x <= 60; x++) {
- list.add(x);
- }
-
- for (Row s : result) {
- list.remove(new Integer(s.getInt("counter")));
- }
- Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
- }
-
- @Override
- protected void verifyResultsDataDiscardingUponRestore(
- OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
- OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task,
- CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-
- ResultSet result = session.execute(SELECT_DATA_QUERY);
- ArrayList<Integer> list = new ArrayList<>();
- for (int x = 1; x <= 20; x++) {
- list.add(x);
- }
- for (int x = 41; x <= 60; x++) {
- list.add(x);
- }
-
- for (Row s : result) {
- list.remove(new Integer(s.getInt("counter")));
- }
- Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
- }
-
- @Test
- public void testCassandraCommitter() throws Exception {
- CassandraCommitter cc1 = new CassandraCommitter(builder);
- cc1.setJobId("job");
- cc1.setOperatorId("operator");
- cc1.setOperatorSubtaskId(0);
-
- CassandraCommitter cc2 = new CassandraCommitter(builder);
- cc2.setJobId("job");
- cc2.setOperatorId("operator");
- cc2.setOperatorSubtaskId(1);
-
- CassandraCommitter cc3 = new CassandraCommitter(builder);
- cc3.setJobId("job");
- cc3.setOperatorId("operator1");
- cc3.setOperatorSubtaskId(0);
-
- cc1.createResource();
-
- cc1.open();
- cc2.open();
- cc3.open();
-
- Assert.assertFalse(cc1.isCheckpointCommitted(1));
- Assert.assertFalse(cc2.isCheckpointCommitted(1));
- Assert.assertFalse(cc3.isCheckpointCommitted(1));
-
- cc1.commitCheckpoint(1);
- Assert.assertTrue(cc1.isCheckpointCommitted(1));
- //verify that other sub-tasks aren't affected
- Assert.assertFalse(cc2.isCheckpointCommitted(1));
- //verify that other tasks aren't affected
- Assert.assertFalse(cc3.isCheckpointCommitted(1));
-
- Assert.assertFalse(cc1.isCheckpointCommitted(2));
-
- cc1.close();
- cc2.close();
- cc3.close();
-
- cc1 = new CassandraCommitter(builder);
- cc1.setJobId("job");
- cc1.setOperatorId("operator");
- cc1.setOperatorSubtaskId(0);
-
- cc1.open();
-
- //verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data
- Assert.assertTrue(cc1.isCheckpointCommitted(1));
- Assert.assertFalse(cc1.isCheckpointCommitted(2));
-
- cc1.close();
- }
-
- //=====At-Least-Once================================================================================================
- @Test
- public void testCassandraTupleAtLeastOnceSink() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection);
- source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
-
- env.execute();
-
- ResultSet rs = session.execute(SELECT_DATA_QUERY);
- Assert.assertEquals(20, rs.all().size());
- }
-
- @Test
- public void testCassandraPojoAtLeastOnceSink() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStreamSource<Pojo> source = env
- .addSource(new SourceFunction<Pojo>() {
-
- private boolean running = true;
- private volatile int cnt = 0;
-
- @Override
- public void run(SourceContext<Pojo> ctx) throws Exception {
- while (running) {
- ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0));
- cnt++;
- if (cnt == 20) {
- cancel();
- }
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
-
- env.execute();
-
- ResultSet rs = session.execute(SELECT_DATA_QUERY);
- Assert.assertEquals(20, rs.all().size());
- }
-
- @Test
- public void testCassandraBatchFormats() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection);
- dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
-
- env.execute("Write data");
-
- DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
- new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
- TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
-
-
- long count = inputDS.count();
- Assert.assertEquals(count, 20L);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7cd9bb5f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
index e3df9fc..221d7da 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
@@ -25,9 +25,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -40,10 +41,9 @@ import java.util.ArrayList;
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore("javax.management.*")
-public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> {
+public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> extends TestLogger {
- protected class OperatorExposingTask<INT> extends OneInputStreamTask<INT, INT> {
+ protected static class OperatorExposingTask<INT> extends OneInputStreamTask<INT, INT> {
public OneInputStreamOperator<INT, INT> getOperator() {
return this.headOperator;
}