You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by te...@apache.org on 2016/09/07 04:18:15 UTC
bahir-flink git commit: BAHIR-59[AMQ] Fix constructor visibility and
error messages (rmetzger)
Repository: bahir-flink
Updated Branches:
refs/heads/master 1f839d510 -> a105a7c34
BAHIR-59[AMQ] Fix constructor visibility and error messages (rmetzger)
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/a105a7c3
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/a105a7c3
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/a105a7c3
Branch: refs/heads/master
Commit: a105a7c3455b9851f951506ba91f1472002d323f
Parents: 1f839d5
Author: tedyu <yu...@gmail.com>
Authored: Tue Sep 6 21:15:39 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Sep 6 21:15:39 2016 -0700
----------------------------------------------------------------------
.../streaming/connectors/activemq/AMQSinkConfig.java | 6 +++---
.../flink/streaming/connectors/activemq/AMQSource.java | 4 ++--
.../streaming/connectors/activemq/AMQSourceConfig.java | 10 +++++-----
3 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a105a7c3/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
index 86254ff..e10c3c8 100644
--- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
@@ -34,9 +34,9 @@ public class AMQSinkConfig<IN> {
public AMQSinkConfig(ActiveMQConnectionFactory connectionFactory, String queueName,
SerializationSchema<IN> serializationSchema, boolean persistentDelivery,
DestinationType destinationType) {
- this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory");
- this.queueName = Preconditions.checkNotNull(queueName, "destinationName");
- this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema");
+ this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory not set");
+ this.queueName = Preconditions.checkNotNull(queueName, "destinationName not set");
+ this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
this.persistentDelivery = persistentDelivery;
this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType");
}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a105a7c3/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
----------------------------------------------------------------------
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
index 49f2cf7..e64b8fd 100644
--- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
@@ -92,7 +92,7 @@ public class AMQSource<OUT> extends MessageAcknowledgingSourceBase<OUT, String>
*
* @param config AMQSource configuration
*/
- AMQSource(AMQSourceConfig<OUT> config) {
+ public AMQSource(AMQSourceConfig<OUT> config) {
super(String.class);
this.connectionFactory = config.getConnectionFactory();
this.destinationName = config.getDestinationName();
@@ -218,7 +218,7 @@ public class AMQSource<OUT> extends MessageAcknowledgingSourceBase<OUT, String>
Message message = consumer.receive(1000);
if (! (message instanceof BytesMessage)) {
- LOG.warn("Active MQ source received non bytes message: {}");
+ LOG.warn("Active MQ source received non bytes message: {}", message);
return;
}
BytesMessage bytesMessage = (BytesMessage) message;
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a105a7c3/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
index 2dcb2cb..dd73b0e 100644
--- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
@@ -37,11 +37,11 @@ public class AMQSourceConfig<OUT> {
AMQSourceConfig(ActiveMQConnectionFactory connectionFactory, String destinationName,
DeserializationSchema<OUT> deserializationSchema, RunningChecker runningChecker,
DestinationType destinationType) {
- this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory");
- this.destinationName = Preconditions.checkNotNull(destinationName, "destinationName");
- this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
- this.runningChecker = Preconditions.checkNotNull(runningChecker, "runningChecker");
- this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType");
+ this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory not set");
+ this.destinationName = Preconditions.checkNotNull(destinationName, "destinationName not set");
+ this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "deserializationSchema not set");
+ this.runningChecker = Preconditions.checkNotNull(runningChecker, "runningChecker not set");
+ this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType not set");
}
public ActiveMQConnectionFactory getConnectionFactory() {