You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/01 17:54:03 UTC
[8/8] flink git commit: [FLINK-4290] [Cassandra Connector] Skip
CassandraConnectorTest on Java 7 builds
[FLINK-4290] [Cassandra Connector] Skip CassandraConnectorTest on Java 7 builds
Cassandra needs Java 8 to run reliably.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/873d6cd1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/873d6cd1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/873d6cd1
Branch: refs/heads/master
Commit: 873d6cd184d7e4f3ad314eefa273b88e08ea6b0a
Parents: 0ea2596
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 16:55:02 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:13 2016 +0200
----------------------------------------------------------------------
.../runtime/testutils/CommonTestUtils.java | 9 +-
.../cassandra/CassandraConnectorTest.java | 92 ++++++++++++++------
2 files changed, 67 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/873d6cd1/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 6bd8b34..59c37b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -51,7 +51,7 @@ public class CommonTestUtils {
try {
Thread.sleep(remaining);
}
- catch (InterruptedException e) {}
+ catch (InterruptedException ignored) {}
now = System.currentTimeMillis();
}
@@ -137,8 +137,7 @@ public class CommonTestUtils {
}
public static void printLog4jDebugConfig(File file) throws IOException {
- FileWriter fw = new FileWriter(file);
- try {
+ try (FileWriter fw = new FileWriter(file)) {
PrintWriter writer = new PrintWriter(fw);
writer.println("log4j.rootLogger=DEBUG, console");
@@ -152,9 +151,6 @@ public class CommonTestUtils {
writer.flush();
writer.close();
}
- finally {
- fw.close();
- }
}
public static File createTempDirectory() throws IOException {
@@ -165,7 +161,6 @@ public class CommonTestUtils {
if (!dir.exists() && dir.mkdirs()) {
return dir;
}
- System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
}
throw new IOException("Could not create temporary file directory");
http://git-wip-us.apache.org/repos/asf/flink/blob/873d6cd1/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
index 8d0c02e..2018255 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.cassandra;
import com.datastax.driver.core.Cluster;
@@ -23,9 +24,12 @@ import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+
import org.apache.cassandra.service.CassandraDaemon;
+
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -42,6 +46,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -49,10 +54,12 @@ import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.internal.AssumptionViolatedException;
import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
+
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,16 +71,21 @@ import java.util.ArrayList;
import java.util.Scanner;
import java.util.UUID;
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
+
private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
private static File tmpDir;
private static final boolean EMBEDDED = true;
+
private static EmbeddedCassandraService cassandra;
- private transient static ClusterBuilder builder = new ClusterBuilder() {
+
+ private static ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder
@@ -83,6 +95,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
.withoutMetrics().build();
}
};
+
private static Cluster cluster;
private static Session session;
@@ -97,7 +110,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
static {
for (int i = 0; i < 20; i++) {
- collection.add(new Tuple3<>("" + UUID.randomUUID(), i, 0));
+ collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
}
}
@@ -115,15 +128,36 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
}
}
- //=====Setup========================================================================================================
+ // ------------------------------------------------------------------------
+ // Cassandra Cluster Setup
+ // ------------------------------------------------------------------------
+
@BeforeClass
public static void startCassandra() throws IOException {
- //generate temporary files
+
+ // check if we should run this test, current Cassandra version requires Java >= 1.8
+ try {
+ String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3);
+ float javaVersion = Float.parseFloat(javaVersionString);
+ Assume.assumeTrue(javaVersion >= 1.8f);
+ }
+ catch (AssumptionViolatedException e) {
+ System.out.println("Skipping CassandraConnectorTest, because the JDK is < Java 8+");
+ throw e;
+ }
+ catch (Exception e) {
+ LOG.error("Cannot determine Java version", e);
+ e.printStackTrace();
+ fail("Cannot determine Java version");
+ }
+
+ // generate temporary files
tmpDir = CommonTestUtils.createTempDirectory();
- ClassLoader classLoader = CassandraTupleWriteAheadSink.class.getClassLoader();
+ ClassLoader classLoader = CassandraConnectorTest.class.getClassLoader();
File file = new File(classLoader.getResource("cassandra.yaml").getFile());
File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
- tmp.createNewFile();
+
+ assertTrue(tmp.createNewFile());
BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
//copy cassandra.yaml; inject absolute paths into cassandra.yaml
@@ -139,7 +173,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
// Tell cassandra where the configuration files are.
// Use the test configuration file.
- System.setProperty("cassandra.config", "file:" + File.separator + File.separator + File.separator + tmp.getAbsolutePath());
+ System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
if (EMBEDDED) {
cassandra = new EmbeddedCassandraService();
@@ -160,13 +194,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
@Before
public void checkIfIgnore() {
- String runtime = System.getProperty("java.runtime.name");
- String version = System.getProperty("java.runtime.version");
- LOG.info("Running tests on runtime: '{}', version: '{}'", runtime, version);
- // The tests are failing on Oracle JDK 7 on Travis due to garbage collection issues.
- // Oracle JDK identifies itself as "Java(TM) SE Runtime Environment"
- // OpenJDK is "OpenJDK Runtime Environment"
- Assume.assumeFalse(runtime.startsWith("Java") && version.startsWith("1.7"));
+
}
@After
@@ -176,13 +204,23 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
@AfterClass
public static void closeCassandra() {
- session.executeAsync(DROP_KEYSPACE_QUERY);
- session.close();
- cluster.close();
- if (EMBEDDED) {
+ if (session != null) {
+ session.executeAsync(DROP_KEYSPACE_QUERY);
+ session.close();
+ }
+
+ if (cluster != null) {
+ cluster.close();
+ }
+
+ if (cassandra != null) {
cassandra.stop();
}
- tmpDir.delete();
+
+ if (tmpDir != null) {
+ //noinspection ResultOfMethodCallIgnored
+ tmpDir.delete();
+ }
}
//=====Exactly-Once=================================================================================================
@@ -202,7 +240,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
@Override
protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
- return new Tuple3<>("" + UUID.randomUUID(), counter, checkpointID);
+ return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
}
@Override
@@ -379,7 +417,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String
DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
- new TupleTypeInfo(Tuple3.class, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
+ TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
long count = inputDS.count();