You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/27 10:20:18 UTC

flink git commit: [FLINK-4119] Refactor null checks in Cassandra IOF

Repository: flink
Updated Branches:
  refs/heads/master 6744b852d -> 9487fcbfb


[FLINK-4119] Refactor null checks in Cassandra IOF

This closes #2163


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9487fcbf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9487fcbf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9487fcbf

Branch: refs/heads/master
Commit: 9487fcbfbae85caf82729e0dda5403fab2fb7d84
Parents: 6744b85
Author: Andrea Sella <an...@radicalbit.io>
Authored: Sat Jun 25 18:03:20 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jun 27 12:19:02 2016 +0200

----------------------------------------------------------------------
 .../cassandra/CassandraInputFormat.java         | 22 +++++++++++---------
 .../cassandra/CassandraOutputFormat.java        | 22 +++++++++++---------
 2 files changed, 24 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9487fcbf/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
index 6818288..849e023 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,12 +53,9 @@ public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT
 	private transient ResultSet resultSet;
 
 	public CassandraInputFormat(String query, ClusterBuilder builder) {
-		if (Strings.isNullOrEmpty(query)) {
-			throw new IllegalArgumentException("Query cannot be null or empty");
-		}
-		if (builder == null) {
-			throw new IllegalArgumentException("Builder cannot be null.");
-		}
+		Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty");
+		Preconditions.checkArgument(builder != null, "Builder cannot be null");
+
 		this.query = query;
 		this.builder = builder;
 	}
@@ -115,15 +113,19 @@ public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT
 	@Override
 	public void close() throws IOException {
 		try {
-			session.close();
+			if (session != null) {
+				session.close();
+			}
 		} catch (Exception e) {
-			LOG.info("Inputformat couldn't be closed." + e.getMessage(), e);
+			LOG.error("Error while closing session.", e);
 		}
 
 		try {
-			cluster.close();
+			if (cluster != null ) {
+				cluster.close();
+			}
 		} catch (Exception e) {
-			LOG.info("Inputformat couldn't be closed." + e.getMessage(), e);
+			LOG.error("Error while closing cluster.", e);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9487fcbf/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
index 116db89..15d8fb3 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,12 +52,9 @@ public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<O
 	private transient Throwable exception = null;
 
 	public CassandraOutputFormat(String insertQuery, ClusterBuilder builder) {
-		if (Strings.isNullOrEmpty(insertQuery)) {
-			throw new IllegalArgumentException("insertQuery cannot be null or empty");
-		}
-		if (builder == null) {
-			throw new IllegalArgumentException("Builder cannot be null.");
-		}
+		Preconditions.checkArgument(!Strings.isNullOrEmpty(insertQuery), "Query cannot be null or empty");
+		Preconditions.checkArgument(builder != null, "Builder cannot be null");
+
 		this.insertQuery = insertQuery;
 		this.builder = builder;
 	}
@@ -109,15 +107,19 @@ public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<O
 	@Override
 	public void close() throws IOException {
 		try {
-			session.close();
+			if (session != null) {
+				session.close();
+			}
 		} catch (Exception e) {
-			LOG.warn("Inputformat couldn't be closed.", e);
+			LOG.error("Error while closing session.", e);
 		}
 
 		try {
-			cluster.close();
+			if (cluster != null ) {
+				cluster.close();
+			}
 		} catch (Exception e) {
-			LOG.warn("Inputformat couldn't be closed." , e);
+			LOG.error("Error while closing cluster.", e);
 		}
 	}
 }