You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by te...@apache.org on 2016/09/07 04:18:15 UTC

bahir-flink git commit: BAHIR-59[AMQ] Fix constructor visibility and error messages (rmetzger)

Repository: bahir-flink
Updated Branches:
  refs/heads/master 1f839d510 -> a105a7c34


BAHIR-59[AMQ] Fix constructor visibility and error messages (rmetzger)


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/a105a7c3
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/a105a7c3
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/a105a7c3

Branch: refs/heads/master
Commit: a105a7c3455b9851f951506ba91f1472002d323f
Parents: 1f839d5
Author: tedyu <yu...@gmail.com>
Authored: Tue Sep 6 21:15:39 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Sep 6 21:15:39 2016 -0700

----------------------------------------------------------------------
 .../streaming/connectors/activemq/AMQSinkConfig.java      |  6 +++---
 .../flink/streaming/connectors/activemq/AMQSource.java    |  4 ++--
 .../streaming/connectors/activemq/AMQSourceConfig.java    | 10 +++++-----
 3 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a105a7c3/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
index 86254ff..e10c3c8 100644
--- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
@@ -34,9 +34,9 @@ public class AMQSinkConfig<IN> {
     public AMQSinkConfig(ActiveMQConnectionFactory connectionFactory, String queueName,
                         SerializationSchema<IN> serializationSchema, boolean persistentDelivery,
                         DestinationType destinationType) {
-        this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory");
-        this.queueName = Preconditions.checkNotNull(queueName, "destinationName");
-        this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema");
+        this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory not set");
+        this.queueName = Preconditions.checkNotNull(queueName, "destinationName not set");
+        this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
         this.persistentDelivery = persistentDelivery;
         this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType");
     }

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a105a7c3/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
----------------------------------------------------------------------
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
index 49f2cf7..e64b8fd 100644
--- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
@@ -92,7 +92,7 @@ public class AMQSource<OUT> extends MessageAcknowledgingSourceBase<OUT, String>
      *
      * @param config AMQSource configuration
      */
-    AMQSource(AMQSourceConfig<OUT> config) {
+    public AMQSource(AMQSourceConfig<OUT> config) {
         super(String.class);
         this.connectionFactory = config.getConnectionFactory();
         this.destinationName = config.getDestinationName();
@@ -218,7 +218,7 @@ public class AMQSource<OUT> extends MessageAcknowledgingSourceBase<OUT, String>
 
             Message message = consumer.receive(1000);
             if (! (message instanceof BytesMessage)) {
-                LOG.warn("Active MQ source received non bytes message: {}");
+                LOG.warn("Active MQ source received non bytes message: {}", message);
                 return;
             }
             BytesMessage bytesMessage = (BytesMessage) message;

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a105a7c3/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
index 2dcb2cb..dd73b0e 100644
--- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
@@ -37,11 +37,11 @@ public class AMQSourceConfig<OUT> {
     AMQSourceConfig(ActiveMQConnectionFactory connectionFactory, String destinationName,
                     DeserializationSchema<OUT> deserializationSchema, RunningChecker runningChecker,
                     DestinationType destinationType) {
-        this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory");
-        this.destinationName = Preconditions.checkNotNull(destinationName, "destinationName");
-        this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
-        this.runningChecker = Preconditions.checkNotNull(runningChecker, "runningChecker");
-        this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType");
+        this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory not set");
+        this.destinationName = Preconditions.checkNotNull(destinationName, "destinationName not set");
+        this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "deserializationSchema not set");
+        this.runningChecker = Preconditions.checkNotNull(runningChecker, "runningChecker not set");
+        this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType not set");
     }
 
     public ActiveMQConnectionFactory getConnectionFactory() {