You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/06/27 10:20:18 UTC
flink git commit: [FLINK-4119] Refactor null checks in Cassandra IOF
Repository: flink
Updated Branches:
refs/heads/master 6744b852d -> 9487fcbfb
[FLINK-4119] Refactor null checks in Cassandra IOF
This closes #2163
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9487fcbf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9487fcbf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9487fcbf
Branch: refs/heads/master
Commit: 9487fcbfbae85caf82729e0dda5403fab2fb7d84
Parents: 6744b85
Author: Andrea Sella <an...@radicalbit.io>
Authored: Sat Jun 25 18:03:20 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jun 27 12:19:02 2016 +0200
----------------------------------------------------------------------
.../cassandra/CassandraInputFormat.java | 22 +++++++++++---------
.../cassandra/CassandraOutputFormat.java | 22 +++++++++++---------
2 files changed, 24 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9487fcbf/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
index 6818288..849e023 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,12 +53,9 @@ public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT
private transient ResultSet resultSet;
public CassandraInputFormat(String query, ClusterBuilder builder) {
- if (Strings.isNullOrEmpty(query)) {
- throw new IllegalArgumentException("Query cannot be null or empty");
- }
- if (builder == null) {
- throw new IllegalArgumentException("Builder cannot be null.");
- }
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty");
+ Preconditions.checkArgument(builder != null, "Builder cannot be null");
+
this.query = query;
this.builder = builder;
}
@@ -115,15 +113,19 @@ public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT
@Override
public void close() throws IOException {
try {
- session.close();
+ if (session != null) {
+ session.close();
+ }
} catch (Exception e) {
- LOG.info("Inputformat couldn't be closed." + e.getMessage(), e);
+ LOG.error("Error while closing session.", e);
}
try {
- cluster.close();
+ if (cluster != null ) {
+ cluster.close();
+ }
} catch (Exception e) {
- LOG.info("Inputformat couldn't be closed." + e.getMessage(), e);
+ LOG.error("Error while closing cluster.", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9487fcbf/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
index 116db89..15d8fb3 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,12 +52,9 @@ public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<O
private transient Throwable exception = null;
public CassandraOutputFormat(String insertQuery, ClusterBuilder builder) {
- if (Strings.isNullOrEmpty(insertQuery)) {
- throw new IllegalArgumentException("insertQuery cannot be null or empty");
- }
- if (builder == null) {
- throw new IllegalArgumentException("Builder cannot be null.");
- }
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(insertQuery), "Query cannot be null or empty");
+ Preconditions.checkArgument(builder != null, "Builder cannot be null");
+
this.insertQuery = insertQuery;
this.builder = builder;
}
@@ -109,15 +107,19 @@ public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<O
@Override
public void close() throws IOException {
try {
- session.close();
+ if (session != null) {
+ session.close();
+ }
} catch (Exception e) {
- LOG.warn("Inputformat couldn't be closed.", e);
+ LOG.error("Error while closing session.", e);
}
try {
- cluster.close();
+ if (cluster != null ) {
+ cluster.close();
+ }
} catch (Exception e) {
- LOG.warn("Inputformat couldn't be closed." , e);
+ LOG.error("Error while closing cluster.", e);
}
}
}