You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/10 12:13:33 UTC

[1/2] flink git commit: [FLINK-5101] Track pending records in CassandraSinkBase

Repository: flink
Updated Branches:
  refs/heads/release-1.3 bb03c07d9 -> 5d6b5a677


[FLINK-5101] Track pending records in CassandraSinkBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a85b1881
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a85b1881
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a85b1881

Branch: refs/heads/release-1.3
Commit: a85b1881d184c02075441f57f3364ec80b2d4f4d
Parents: bb03c07
Author: zentol <ch...@apache.org>
Authored: Wed Nov 23 16:59:22 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Wed May 10 12:07:33 2017 +0200

----------------------------------------------------------------------
 .../connectors/cassandra/CassandraPojoSink.java |  7 +--
 .../connectors/cassandra/CassandraSinkBase.java | 56 +++++++++++++++-----
 2 files changed, 47 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a85b1881/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index 650c481..9cfb2f8 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
+import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -31,7 +32,7 @@ import org.apache.flink.configuration.Configuration;
  *
  * @param <IN> Type of the elements emitted by this sink
  */
-public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
+public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -61,7 +62,7 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
 	}
 
 	@Override
-	public ListenableFuture<Void> send(IN value) {
-		return mapper.saveAsync(value);
+	public ListenableFuture<ResultSet> send(IN value) {
+		return session.executeAsync(mapper.saveQuery(value));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a85b1881/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 49b1efa..b281525 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}.
@@ -40,11 +41,13 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 	protected transient Cluster cluster;
 	protected transient Session session;
 
-	protected transient Throwable exception = null;
+	protected transient volatile Throwable exception;
 	protected transient FutureCallback<V> callback;
 
 	private final ClusterBuilder builder;
 
+	private final AtomicInteger updatesPending = new AtomicInteger();
+
 	protected CassandraSinkBase(ClusterBuilder builder) {
 		this.builder = builder;
 		ClosureCleaner.clean(builder, true);
@@ -55,11 +58,24 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 		this.callback = new FutureCallback<V>() {
 			@Override
 			public void onSuccess(V ignored) {
+				int pending = updatesPending.decrementAndGet();
+				if (pending == 0) {
+					synchronized (updatesPending) {
+						updatesPending.notifyAll();
+					}
+				}
 			}
 
 			@Override
 			public void onFailure(Throwable t) {
+				int pending = updatesPending.decrementAndGet();
+				if (pending == 0) {
+					synchronized (updatesPending) {
+						updatesPending.notifyAll();
+					}
+				}
 				exception = t;
+				
 				LOG.error("Error while sending value.", t);
 			}
 		};
@@ -70,29 +86,43 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 	@Override
 	public void invoke(IN value) throws Exception {
 		if (exception != null) {
-			throw new IOException("invoke() failed", exception);
+			throw new IOException("Error while sending value.", exception);
 		}
 		ListenableFuture<V> result = send(value);
+		updatesPending.incrementAndGet();
 		Futures.addCallback(result, callback);
 	}
 
 	public abstract ListenableFuture<V> send(IN value);
 
 	@Override
-	public void close() {
+	public void close() throws Exception {
 		try {
-			if (session != null) {
-				session.close();
+			if (exception != null) {
+				throw new IOException("Error while sending value.", exception);
 			}
-		} catch (Exception e) {
-			LOG.error("Error while closing session.", e);
-		}
-		try {
-			if (cluster != null) {
-				cluster.close();
+
+			while (updatesPending.get() > 0) {
+				synchronized (updatesPending) {
+					updatesPending.wait();
+				}
+			}
+			
+		} finally {
+			try {
+				if (session != null) {
+					session.close();
+				}
+			} catch (Exception e) {
+				LOG.error("Error while closing session.", e);
+			}
+			try {
+				if (cluster != null) {
+					cluster.close();
+				}
+			} catch (Exception e) {
+				LOG.error("Error while closing cluster.", e);
 			}
-		} catch (Exception e) {
-			LOG.error("Error while closing cluster.", e);
 		}
 	}
 }


[2/2] flink git commit: [FLINK-5101] Refactor CassandraConnectorITCase

Posted by ch...@apache.org.
[FLINK-5101] Refactor CassandraConnectorITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d6b5a67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d6b5a67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d6b5a67

Branch: refs/heads/release-1.3
Commit: 5d6b5a6773a795a3266b6298ae26d4f158ce18c4
Parents: a85b188
Author: zentol <ch...@apache.org>
Authored: Wed May 10 12:07:42 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 10 12:07:42 2017 +0200

----------------------------------------------------------------------
 .../cassandra/CassandraConnectorITCase.java     | 213 ++++++++-----------
 1 file changed, 89 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5d6b5a67/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index f2e8f8b..06f3c35 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -28,39 +28,24 @@ import com.datastax.driver.core.Session;
 import org.apache.cassandra.service.CassandraDaemon;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.TestEnvironment;
 
-import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,6 +56,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.Random;
 import java.util.Scanner;
 import java.util.UUID;
 
@@ -100,12 +87,15 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	private static Cluster cluster;
 	private static Session session;
 
+	private static final String TABLE_NAME_PREFIX = "flink_";
+	private static final String TABLE_NAME_VARIABLE = "$TABLE";
 	private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
-	private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;";
-	private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);";
-	private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
-	private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)";
-	private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;";
+	private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink." + TABLE_NAME_VARIABLE + " (id text PRIMARY KEY, counter int, batch_id int);";
+	private static final String INSERT_DATA_QUERY = "INSERT INTO flink." + TABLE_NAME_VARIABLE + " (id, counter, batch_id) VALUES (?, ?, ?)";
+	private static final String SELECT_DATA_QUERY = "SELECT * FROM flink." + TABLE_NAME_VARIABLE + ';';
+
+	private static final Random random = new Random();
+	private int tableID;
 
 	private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20);
 
@@ -129,12 +119,6 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		}
 	}
 
-	private static LocalFlinkMiniCluster flinkCluster;
-
-	// ------------------------------------------------------------------------
-	//  Cluster Setup (Cassandra & Flink)
-	// ------------------------------------------------------------------------
-
 	@BeforeClass
 	public static void startCassandra() throws IOException {
 
@@ -173,39 +157,39 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 			cassandra.start();
 		}
 
-		try {
-			Thread.sleep(1000 * 10);
-		} catch (InterruptedException e) { //give cassandra a few seconds to start up
+		// start establishing a connection within 30 seconds
+		long start = System.nanoTime();
+		long deadline = start + 30_000_000_000L;
+		while (true) {
+			try {
+				cluster = builder.getCluster();
+				session = cluster.connect();
+				break;
+			} catch (Exception e) {
+				if (System.nanoTime() > deadline) {
+					throw e;
+				}
+				try {
+					Thread.sleep(500);
+				} catch (InterruptedException ignored) {
+				}
+			}
 		}
-
-		cluster = builder.getCluster();
-		session = cluster.connect();
+		LOG.debug("Connection established after {}ms.", System.currentTimeMillis() - start);
 
 		session.execute(CREATE_KEYSPACE_QUERY);
-		session.execute(CREATE_TABLE_QUERY);
-	}
-
-	@BeforeClass
-	public static void startFlink() throws Exception {
-		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
-		flinkCluster = new LocalFlinkMiniCluster(config);
-		flinkCluster.start();
+		session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial"));
 	}
 
-	@AfterClass
-	public static void stopFlink() {
-		if (flinkCluster != null) {
-			flinkCluster.stop();
-			flinkCluster = null;
-		}
+	@Before
+	public void createTable() {
+		tableID = random.nextInt(Integer.MAX_VALUE);
+		session.execute(injectTableName(CREATE_TABLE_QUERY));
 	}
 
 	@AfterClass
 	public static void closeCassandra() {
 		if (session != null) {
-			session.executeAsync(DROP_KEYSPACE_QUERY);
 			session.close();
 		}
 
@@ -224,28 +208,13 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	}
 
 	// ------------------------------------------------------------------------
-	//  Test preparation & cleanup
-	// ------------------------------------------------------------------------
-
-	@Before
-	public void initializeExecutionEnvironment() {
-		TestStreamEnvironment.setAsContext(flinkCluster, 4);
-		new TestEnvironment(flinkCluster, 4, false).setAsContext();
-	}
-
-	@After
-	public void deleteSchema() throws Exception {
-		session.executeAsync(CLEAR_TABLE_QUERY);
-	}
-
-	// ------------------------------------------------------------------------
 	//  Exactly-once Tests
 	// ------------------------------------------------------------------------
 
 	@Override
 	protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception {
 		return new CassandraTupleWriteAheadSink<>(
-			INSERT_DATA_QUERY,
+			injectTableName(INSERT_DATA_QUERY),
 			TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
 			builder,
 			new CassandraCommitter(builder));
@@ -264,7 +233,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	@Override
 	protected void verifyResultsIdealCircumstances(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
 
-		ResultSet result = session.execute(SELECT_DATA_QUERY);
+		ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
 		ArrayList<Integer> list = new ArrayList<>();
 		for (int x = 1; x <= 60; x++) {
 			list.add(x);
@@ -279,7 +248,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	@Override
 	protected void verifyResultsDataPersistenceUponMissedNotify(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
 
-		ResultSet result = session.execute(SELECT_DATA_QUERY);
+		ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
 		ArrayList<Integer> list = new ArrayList<>();
 		for (int x = 1; x <= 60; x++) {
 			list.add(x);
@@ -294,7 +263,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	@Override
 	protected void verifyResultsDataDiscardingUponRestore(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
 
-		ResultSet result = session.execute(SELECT_DATA_QUERY);
+		ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
 		ArrayList<Integer> list = new ArrayList<>();
 		for (int x = 1; x <= 20; x++) {
 			list.add(x);
@@ -324,7 +293,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		}
 
 		ArrayList<Integer> actual = new ArrayList<>();
-		ResultSet result = session.execute(SELECT_DATA_QUERY);
+		ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));
 		for (Row s : result) {
 			actual.add(s.getInt("counter"));
 		}
@@ -335,16 +304,17 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 
 	@Test
 	public void testCassandraCommitter() throws Exception {
-		CassandraCommitter cc1 = new CassandraCommitter(builder);
-		cc1.setJobId("job");
+		String jobID = new JobID().toString();
+		CassandraCommitter cc1 = new CassandraCommitter(builder, "flink_auxiliary_cc");
+		cc1.setJobId(jobID);
 		cc1.setOperatorId("operator");
 
-		CassandraCommitter cc2 = new CassandraCommitter(builder);
-		cc2.setJobId("job");
+		CassandraCommitter cc2 = new CassandraCommitter(builder, "flink_auxiliary_cc");
+		cc2.setJobId(jobID);
 		cc2.setOperatorId("operator");
 
-		CassandraCommitter cc3 = new CassandraCommitter(builder);
-		cc3.setJobId("job");
+		CassandraCommitter cc3 = new CassandraCommitter(builder, "flink_auxiliary_cc");
+		cc3.setJobId(jobID);
 		cc3.setOperatorId("operator1");
 
 		cc1.createResource();
@@ -370,8 +340,8 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		cc2.close();
 		cc3.close();
 
-		cc1 = new CassandraCommitter(builder);
-		cc1.setJobId("job");
+		cc1 = new CassandraCommitter(builder, "flink_auxiliary_cc");
+		cc1.setJobId(jobID);
 		cc1.setOperatorId("operator");
 
 		cc1.open();
@@ -389,70 +359,65 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 
 	@Test
 	public void testCassandraTupleAtLeastOnceSink() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
+		CassandraTupleSink<Tuple3<String, Integer, Integer>> sink = new CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY), builder);
 
-		DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection);
-		source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
+		sink.open(new Configuration());
+
+		for (Tuple3<String, Integer, Integer> value : collection) {
+			sink.send(value);
+		}
 
-		env.execute();
+		sink.close();
 
-		ResultSet rs = session.execute(SELECT_DATA_QUERY);
+		ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
 		Assert.assertEquals(20, rs.all().size());
 	}
 
 	@Test
 	public void testCassandraPojoAtLeastOnceSink() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		DataStreamSource<Pojo> source = env
-			.addSource(new SourceFunction<Pojo>() {
-
-				private boolean running = true;
-				private volatile int cnt = 0;
-
-				@Override
-				public void run(SourceContext<Pojo> ctx) throws Exception {
-					while (running) {
-						ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0));
-						cnt++;
-						if (cnt == 20) {
-							cancel();
-						}
-					}
-				}
+		session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
 
-				@Override
-				public void cancel() {
-					running = false;
-				}
-			});
+		CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, builder);
 
-		source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
+		sink.open(new Configuration());
 
-		env.execute();
+		for (int x = 0; x < 20; x++) {
+			sink.send(new Pojo(UUID.randomUUID().toString(), x, 0));
+		}
 
-		ResultSet rs = session.execute(SELECT_DATA_QUERY);
+		sink.close();
+
+		ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
 		Assert.assertEquals(20, rs.all().size());
 	}
 
 	@Test
 	public void testCassandraBatchFormats() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
+		OutputFormat<Tuple3<String, Integer, Integer>> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
+		sink.configure(new Configuration());
+		sink.open(0, 1);
 
-		DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection);
-		dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
+		for (Tuple3<String, Integer, Integer> value : collection) {
+			sink.writeRecord(value);
+		}
 
-		env.execute("Write data");
+		sink.close();
 
-		DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
-			new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
-			TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
+		InputFormat<Tuple3<String, Integer, Integer>, InputSplit> source = new CassandraInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder);
+		source.configure(new Configuration());
+		source.open(null);
 
+		List<Tuple3<String, Integer, Integer>> result = new ArrayList<>();
+
+		while (!source.reachedEnd()) {
+			result.add(source.nextRecord(new Tuple3<String, Integer, Integer>()));
+		}
+
+		source.close();
+		Assert.assertEquals(20, result.size());
+	}
 
-		long count = inputDS.count();
-		Assert.assertEquals(count, 20L);
+	private String injectTableName(String target) {
+		return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID);
 	}
 }