You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/01 17:54:03 UTC

[8/8] flink git commit: [FLINK-4290] [Cassandra Connector] Skip CassandraConnectorTest on Java 7 builds

[FLINK-4290] [Cassandra Connector] Skip CassandraConnectorTest on Java 7 builds

Cassandra needs Java 8 to run reliably.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/873d6cd1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/873d6cd1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/873d6cd1

Branch: refs/heads/master
Commit: 873d6cd184d7e4f3ad314eefa273b88e08ea6b0a
Parents: 0ea2596
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 16:55:02 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:13 2016 +0200

----------------------------------------------------------------------
 .../runtime/testutils/CommonTestUtils.java      |  9 +-
 .../cassandra/CassandraConnectorTest.java       | 92 ++++++++++++++------
 2 files changed, 67 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/873d6cd1/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 6bd8b34..59c37b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -51,7 +51,7 @@ public class CommonTestUtils {
 			try {
 				Thread.sleep(remaining);
 			}
-			catch (InterruptedException e) {}
+			catch (InterruptedException ignored) {}
 			
 			now = System.currentTimeMillis();
 		}
@@ -137,8 +137,7 @@ public class CommonTestUtils {
 	}
 
 	public static void printLog4jDebugConfig(File file) throws IOException {
-		FileWriter fw = new FileWriter(file);
-		try {
+		try (FileWriter fw = new FileWriter(file)) {
 			PrintWriter writer = new PrintWriter(fw);
 
 			writer.println("log4j.rootLogger=DEBUG, console");
@@ -152,9 +151,6 @@ public class CommonTestUtils {
 			writer.flush();
 			writer.close();
 		}
-		finally {
-			fw.close();
-		}
 	}
 
 	public static File createTempDirectory() throws IOException {
@@ -165,7 +161,6 @@ public class CommonTestUtils {
 			if (!dir.exists() && dir.mkdirs()) {
 				return dir;
 			}
-			System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
 		}
 
 		throw new IOException("Could not create temporary file directory");

http://git-wip-us.apache.org/repos/asf/flink/blob/873d6cd1/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
index 8d0c02e..2018255 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
@@ -6,15 +6,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.cassandra;
 
 import com.datastax.driver.core.Cluster;
@@ -23,9 +24,12 @@ import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+
 import org.apache.cassandra.service.CassandraDaemon;
+
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -42,6 +46,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -49,10 +54,12 @@ import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.internal.AssumptionViolatedException;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
+
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,16 +71,21 @@ import java.util.ArrayList;
 import java.util.Scanner;
 import java.util.UUID;
 
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
+
 	private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
 	private static File tmpDir;
 
 	private static final boolean EMBEDDED = true;
+
 	private static EmbeddedCassandraService cassandra;
-	private transient static ClusterBuilder builder = new ClusterBuilder() {
+
+	private static ClusterBuilder builder = new ClusterBuilder() {
 		@Override
 		protected Cluster buildCluster(Cluster.Builder builder) {
 			return builder
@@ -83,6 +95,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 				.withoutMetrics().build();
 		}
 	};
+
 	private static Cluster cluster;
 	private static Session session;
 
@@ -97,7 +110,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 
 	static {
 		for (int i = 0; i < 20; i++) {
-			collection.add(new Tuple3<>("" + UUID.randomUUID(), i, 0));
+			collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
 		}
 	}
 
@@ -115,15 +128,36 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 		}
 	}
 
-	//=====Setup========================================================================================================
+	// ------------------------------------------------------------------------
+	//  Cassandra Cluster Setup
+	// ------------------------------------------------------------------------
+
 	@BeforeClass
 	public static void startCassandra() throws IOException {
-		//generate temporary files
+
+		// check if we should run this test, current Cassandra version requires Java >= 1.8
+		try {
+			String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
+			float javaVersion = Float.parseFloat(javaVersionString);
+			Assume.assumeTrue(javaVersion >= 1.8f);
+		}
+		catch (AssumptionViolatedException e) {
+			System.out.println("Skipping CassandraConnectorTest, because the JDK is < Java 8+");
+			throw e;
+		}
+		catch (Exception e) {
+			LOG.error("Cannot determine Java version", e);
+			e.printStackTrace();
+			fail("Cannot determine Java version");
+		}
+
+		// generate temporary files
 		tmpDir = CommonTestUtils.createTempDirectory();
-		ClassLoader classLoader = CassandraTupleWriteAheadSink.class.getClassLoader();
+		ClassLoader classLoader = CassandraConnectorTest.class.getClassLoader();
 		File file = new File(classLoader.getResource("cassandra.yaml").getFile());
 		File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
-		tmp.createNewFile();
+		
+		assertTrue(tmp.createNewFile());
 		BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
 
 		//copy cassandra.yaml; inject absolute paths into cassandra.yaml
@@ -139,7 +173,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 
 		// Tell cassandra where the configuration files are.
 		// Use the test configuration file.
-		System.setProperty("cassandra.config", "file:" + File.separator + File.separator + File.separator + tmp.getAbsolutePath());
+		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
 
 		if (EMBEDDED) {
 			cassandra = new EmbeddedCassandraService();
@@ -160,13 +194,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 
 	@Before
 	public void checkIfIgnore() {
-		String runtime = System.getProperty("java.runtime.name");
-				String version = System.getProperty("java.runtime.version");
-		LOG.info("Running tests on runtime: '{}', version: '{}'", runtime, version);
-		// The tests are failing on Oracle JDK 7 on Travis due to garbage collection issues.
-		// Oracle JDK identifies itself as "Java(TM) SE Runtime Environment"
-		// OpenJDK is "OpenJDK Runtime Environment"
-		Assume.assumeFalse(runtime.startsWith("Java") && version.startsWith("1.7"));
+		
 	}
 
 	@After
@@ -176,13 +204,23 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 
 	@AfterClass
 	public static void closeCassandra() {
-		session.executeAsync(DROP_KEYSPACE_QUERY);
-		session.close();
-		cluster.close();
-		if (EMBEDDED) {
+		if (session != null) {
+			session.executeAsync(DROP_KEYSPACE_QUERY);
+			session.close();
+		}
+
+		if (cluster != null) {
+			cluster.close();
+		}
+
+		if (cassandra != null) {
 			cassandra.stop();
 		}
-		tmpDir.delete();
+
+		if (tmpDir != null) {
+			//noinspection ResultOfMethodCallIgnored
+			tmpDir.delete();
+		}
 	}
 
 	//=====Exactly-Once=================================================================================================
@@ -202,7 +240,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 
 	@Override
 	protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
-		return new Tuple3<>("" + UUID.randomUUID(), counter, checkpointID);
+		return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
 	}
 
 	@Override
@@ -379,7 +417,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
 
 		DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
 			new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
-			new TupleTypeInfo(Tuple3.class, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
+			TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
 
 
 		long count = inputDS.count();