You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:24 UTC
[23/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/KafkaProducer.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/KafkaProducer.java
new file mode 100644
index 0000000..103d61a
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/KafkaProducer.java
@@ -0,0 +1,168 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.kafka;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import org.apache.edgent.connectors.kafka.runtime.KafkaProducerConnector;
+import org.apache.edgent.connectors.kafka.runtime.KafkaPublisher;
+
+/**
+ * {@code KafkaProducer} is a connector for publishing a stream of tuples
+ * to Apache Kafka messaging system topics.
+ * <p>
+ * The connector uses and includes components from the Kafka 0.8.2.2 release.
+ * It has been successfully tested against a kafka_2.11-0.9.0.0 server as well.
+ * For more information about Kafka see
+ * <a href="http://kafka.apache.org">http://kafka.apache.org</a>
+ * <p>
+ * Sample use:
+ * <pre>{@code
+ * String bootstrapServers = "localhost:9092";
+ * String topic = "mySensorReadingsTopic";
+ *
+ * Map<String,Object> config = new HashMap<>();
+ * config.put("bootstrap.servers", bootstrapServers);
+ *
+ * Topology t = ...
+ * KafkaProducer kafka = new KafkaProducer(t, () -> config);
+ *
+ * TStream<JsonObject> sensorReadings = t.poll(
+ * () -> getSensorReading(), 5, TimeUnit.SECONDS);
+ *
+ * // publish as sensor readings as JSON
+ * kafka.publish(sensonReadings, tuple -> tuple.toString(), topic);
+ * }</pre>
+ */
+public class KafkaProducer {
+ @SuppressWarnings("unused")
+ private final Topology t;
+ private final KafkaProducerConnector connector;
+
+ /**
+ * Create a producer connector for publishing tuples to Kafka topics.s
+ * <p>
+ * See the Apache Kafka documentation for {@code KafkaProducer}
+ * configuration properties at <a href="http://kafka.apache.org">http://kafka.apache.org</a>.
+ * Configuration property values are strings.
+ * <p>
+ * The Kafka "New Producer configs" are used. Minimal configuration
+ * typically includes:
+ * <ul>
+ * <li><code>bootstrap.servers</code></li>
+ * </ul>
+ * <p>
+ * The full set of producer configuration items are specified in
+ * {@code org.apache.kafka.clients.producer.ProducerConfig}
+ *
+ * @param t Topology to add to
+ * @param config KafkaProducer configuration information.
+ */
+ public KafkaProducer(Topology t, Supplier<Map<String,Object>> config) {
+ this.t = t;
+ connector = new KafkaProducerConnector(config);
+ }
+
+ /**
+ * Publish the stream of tuples as Kafka key/value records
+ * to the specified topic partitions.
+ * <p>
+ * If a valid partition number is specified that partition will be used
+ * when sending the message. If no partition is specified but a key is
+ * present a partition will be chosen using a hash of the key.
+ * If neither key nor partition is present a partition will be assigned
+ * in a round-robin fashion.
+ *
+ * @param <T> Tuple type
+ * @param stream the stream to publish
+ * @param keyFn A function that yields an optional byte[]
+ * Kafka record's key from the tuple.
+ * Specify null or return a null value for no key.
+ * @param valueFn A function that yields the byte[]
+ * Kafka record's value from the tuple.
+ * @param topicFn A function that yields the topic from the tuple.
+ * @param partitionFn A function that yields the optional topic
+ * partition specification from the tuple.
+ * Specify null or return a null value for no partition specification.
+ * @return {@link TSink}
+ */
+ public <T> TSink<T> publishBytes(TStream<T> stream, Function<T,byte[]> keyFn, Function<T,byte[]> valueFn, Function<T,String> topicFn, Function<T,Integer> partitionFn) {
+ return stream.sink(new KafkaPublisher<T>(connector, keyFn, valueFn, topicFn, partitionFn));
+ }
+
+ /**
+ * Publish the stream of tuples as Kafka key/value records
+ * to the specified partitions of the specified topics.
+ * <p>
+ * This is a convenience method for {@code String} typed key/value
+ * conversion functions.
+ * <p>
+ * @param <T> Tuple type
+ * @param stream the stream to publish
+ * @param keyFn A function that yields an optional String
+ * Kafka record's key from the tuple.
+ * Specify null or return a null value for no key.
+ * @param valueFn A function that yields the String for the
+ * Kafka record's value from the tuple.
+ * @param topicFn A function that yields the topic from the tuple.
+ * @param partitionFn A function that yields the optional topic
+ * partition specification from the tuple.
+ * Specify null or return a null value for no partition specification.
+ * @return {@link TSink}
+ * @see #publishBytes(TStream, Function, Function, Function, Function)
+ */
+ public <T> TSink<T> publish(TStream<T> stream, Function<T,String> keyFn, Function<T,String> valueFn, Function<T,String> topicFn, Function<T,Integer> partitionFn) {
+ Function<T,byte[]> keyFn2 = null;
+ if (keyFn != null) {
+ keyFn2 = tuple -> { String key = keyFn.apply(tuple);
+ return key==null
+ ? null
+ : key.getBytes(StandardCharsets.UTF_8);
+ };
+ }
+ return publishBytes(stream, keyFn2,
+ tuple -> valueFn.apply(tuple).getBytes(StandardCharsets.UTF_8),
+ topicFn, partitionFn);
+ }
+
+ /**
+ * Publish the stream of tuples as Kafka key/value records
+ * to the specified partitions of the specified topics.
+ * <p>
+ * This is a convenience method for a String stream published
+ * as a Kafka record with no key and
+ * a value consisting of the String tuple serialized as UTF-8,
+ * and publishing round-robin to a fixed topic's partitions.
+ *
+ * @param stream the stream to publish
+ * @param topic The topic to publish to
+ * @return {@link TSink}
+ * @see #publish(TStream, Function, Function, Function, Function)
+ */
+ public TSink<String> publish(TStream<String> stream, String topic) {
+ return publish(stream, null, tuple -> tuple, tuple -> topic, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/package-info.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/package-info.java
new file mode 100644
index 0000000..dd1689b
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/package-info.java
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * Apache Kafka enterprise messing hub stream connector.
+ * <p>
+ * The connector uses and includes components from the Kafka 0.8.2.2 release.
+ * It has been successfully tested against a kafka_2.11-0.9.0.0 server as well.
+ * <p>
+ * Stream tuples may be published to Kafka broker topics
+ * and created by subscribing to broker topics.
+ * For more information about Kafka see
+ * <a href="http://kafka.apache.org">http://kafka.apache.org</a>
+ */
+package org.apache.edgent.connectors.kafka;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConnector.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConnector.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConnector.java
new file mode 100644
index 0000000..0575b13
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConnector.java
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.kafka.runtime;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConnector implements Serializable {
+ private static final long serialVersionUID = 1L;
+ protected static final Logger trace = LoggerFactory.getLogger(KafkaConnector.class);
+
+ static Logger getTrace() {
+ return trace;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
new file mode 100644
index 0000000..f5f879f
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
@@ -0,0 +1,263 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.kafka.runtime;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.function.Supplier;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+/**
+ * A connector for consuming Kafka key/value records.
+ */
+public class KafkaConsumerConnector extends KafkaConnector {
+ private static final long serialVersionUID = 1L;
+ private final Supplier<Map<String,Object>> configFn;
+ private String id;
+ // Map<subscriber,List<topic>>>
+ private final Map<KafkaSubscriber<?>,List<String>> subscribers = new HashMap<>();
+ private ConsumerConnector consumer;
+ private final Map<KafkaSubscriber<?>,ExecutorService> executors = new HashMap<>();
+
+ // Ugh. Turns out the new KafkaConsumer present in 8.2.2 isn't baked yet
+ // (e.g., its poll() just return null).
+
+ public KafkaConsumerConnector(Supplier<Map<String, Object>> configFn) {
+ this.configFn = configFn;
+ }
+
+ // unbaked 8.2.2 KafkaConsumer
+// private synchronized KafkaConsumer<byte[],byte[]> client() {
+// if (consumer == null)
+// consumer = new KafkaConsumer<byte[],byte[]>(configFn.get(),
+// null, /*ConsumerRebalanceCallaback*/
+// new ByteArrayDeserializer(), new ByteArrayDeserializer());
+// return consumer;
+// }
+
+ private synchronized ConsumerConnector client() {
+ if (consumer == null)
+ consumer = Consumer.createJavaConsumerConnector(
+ createConsumerConfig());
+ return consumer;
+ }
+
+ private ConsumerConfig createConsumerConfig() {
+ Map<String,Object> config = configFn.get();
+ Properties props = new Properties();
+ for (Entry<String,Object> e : config.entrySet()) {
+ props.put(e.getKey(), e.getValue().toString());
+ }
+ return new ConsumerConfig(props);
+ }
+
+ public synchronized void close(KafkaSubscriber<?> subscriber) {
+ trace.trace("{} closing subscriber {}", id(), subscriber);
+ // TODO hmm... really want to do consumer.shutdown() first
+ // to avoid InterruptedException from shutdown[Now] of
+ // consumer threads (in it.next()).
+ // Our issue is that we can have multiple Subscriber for a
+ // single ConsumerConnection.
+ // Look at streams.messaging to see how it handles this - not
+ // sure it does (e.g., may have only a single operator for a
+ // ConsumerConnection).
+ try {
+ ExecutorService executor = executors.remove(subscriber);
+ if (executor != null) {
+ executor.shutdownNow();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ finally {
+ if (executors.isEmpty()) {
+ trace.info("{} closing consumer", id());
+ if (consumer != null)
+ consumer.shutdown();
+ }
+ }
+ }
+
+ synchronized void addSubscriber(KafkaSubscriber<?> subscriber, String... topics) {
+ List<String> topicList = new ArrayList<>(Arrays.asList(topics));
+ checkSubscription(subscriber, topicList);
+ {
+ // In Kafka 0.8.2.2, ConsumerConnector.createMessageStreams() can
+ // only be called once for a connector instance.
+ // The analogous operation in Kafka 0.9.0.0 doesn't have such a
+ // restriction so for now just enforce a restriction rather than
+ // do the work to make this appear to work.
+ if (!subscribers.isEmpty())
+ throw new IllegalStateException("The KafkaConsumer connection already has a subscriber");
+ }
+ subscribers.put(subscriber, topicList);
+ }
+
+ // unbaked 8.2.2 KafkaConsumer
+// synchronized void addSubscriber(KafkaSubscriber<?> subscriber, TopicPartition... topicPartitions) {
+// checkSubscription(subscriber, (Object[]) topicPartitions);
+// isTopicSubscriptions = false;
+// for (TopicPartition topicPartition : topicPartitions) {
+// trace.info("{} addSubscriber for {}", id(), topicPartition);
+// subscribers.put(subscriber, topicPartition);
+// }
+// }
+
+ private void checkSubscription(KafkaSubscriber<?> subscriber, List<String> topics) {
+ if (topics.size() == 0)
+ throw new IllegalArgumentException("Subscription specification is empty");
+
+ // disallow dup subscriptions
+ Set<String> topicSet = new HashSet<>(topics);
+ if (topicSet.size() != topics.size())
+ throw new IllegalArgumentException("Duplicate subscription");
+
+ // check against existing subscriptions
+ topicSet.clear();
+ for (List<String> l : subscribers.values())
+ topicSet.addAll(l);
+ for (String topic : topics) {
+ if (topicSet.contains(topic))
+ throw new IllegalArgumentException("Duplicate subscription");
+ }
+ }
+
+ synchronized void start(KafkaSubscriber<?> subscriber) {
+ Map<String,Integer> topicCountMap = new HashMap<>();
+ int threadsPerTopic = 1;
+ int totThreadCnt = 0;
+ List<String> topics = subscribers.get(subscriber);
+ for (String topic : topics) {
+ topicCountMap.put(topic, threadsPerTopic);
+ totThreadCnt += threadsPerTopic;
+ }
+
+ Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap =
+ client().createMessageStreams(topicCountMap);
+
+ ExecutorService executor = Executors.newFixedThreadPool(totThreadCnt);
+ executors.put(subscriber, executor);
+
+ for (Entry<String,List<KafkaStream<byte[],byte[]>>> entry : consumerMap.entrySet()) {
+ String topic = entry.getKey();
+ int threadNum = 0;
+ for (KafkaStream<byte[],byte[]> stream : entry.getValue()) {
+ executor.submit(() -> {
+ try {
+ trace.info("{} started consumer thread {} for topic:{}", id(), threadNum, topic);
+ ConsumerIterator<byte[],byte[]> it = stream.iterator();
+ while (it.hasNext()) {
+ subscriber.accept(it.next());
+ }
+ }
+ catch (Throwable t) {
+ if (t instanceof InterruptedException) {
+ // normal close() termination
+ trace.trace("{} consumer for topic:{}. got exception", id(), topic, t);
+ }
+ else
+ trace.error("{} consumer for topic:{}. got exception", id(), topic, t);
+ }
+ finally {
+ trace.info("{} consumer thread {} for topic:{} exiting.", id(), threadNum, topic);
+ }
+ });
+ }
+ }
+ }
+
+ // unbaked 8.2.2 KafkaConsumer
+// synchronized void start(KafkaSubscriber<?> subscriber) {
+// List<Object> subscriptions = subscribers.get(subscriber);
+// trace.info("{} adding subscription for {}", id(), subscriptions);
+// if (subscriptions.get(0) instanceof String)
+// client().subscribe(subscriptions.toArray(new String[0]));
+// else
+// client().subscribe(subscriptions.toArray(new TopicPartition[0]));
+//
+// if (pollFuture == null) {
+// pollFuture = executor.submit(new Runnable() {
+// @Override
+// public void run() {
+// KafkaConsumerConnector.this.run();
+// }
+// });
+// }
+// }
+//
+// private void run() {
+// trace.info("{} poll thread running", id());
+// while (true) {
+// if (Thread.interrupted()) {
+// trace.info("{} poll thread terinating", id());
+// return;
+// }
+//
+// fetchAndProcess();
+// }
+// }
+//
+// private void fetchAndProcess() {
+// Map<String, ConsumerRecords<byte[],byte[]>> map = client().poll(2*1000);
+//
+// for (Entry<String,ConsumerRecords<byte[],byte[]>> e : map.entrySet()) {
+// KafkaSubscriber<?> subscriber = subscribers.get(e.getKey());
+// if (subscriber != null) {
+// for (ConsumerRecord<byte[],byte[]> rec : e.getValue().records()) {
+// trace.info/*trace*/("{} processing record for {}", id(), rec.topicAndPartition());
+// subscriber.accept(rec);
+// }
+// }
+// else {
+// // must be TopicPartition based subscription
+// for (ConsumerRecord<byte[],byte[]> rec : e.getValue().records()) {
+// subscriber = subscribers.get(rec.topicAndPartition());
+// trace.info/*trace*/("{} processing record for {}", id(), rec.topicAndPartition());
+// subscriber.accept(rec);
+// }
+// }
+// }
+// }
+
+ String id() {
+ if (id == null) {
+ // include our short object Id
+ id = "Kafka " + toString().substring(toString().indexOf('@') + 1);
+ }
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaProducerConnector.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaProducerConnector.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaProducerConnector.java
new file mode 100644
index 0000000..852cf70
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaProducerConnector.java
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.kafka.runtime;
+
+import java.util.Map;
+
+import org.apache.edgent.function.Supplier;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+/**
+ * A connector for producing/publishing Kafka key/value records.
+ */
+public class KafkaProducerConnector extends KafkaConnector implements AutoCloseable {
+ private static final long serialVersionUID = 1L;
+ private final Supplier<Map<String,Object>> configFn;
+ private String id;
+ private KafkaProducer<byte[],byte[]> producer;
+
+ public KafkaProducerConnector(Supplier<Map<String, Object>> configFn) {
+ this.configFn = configFn;
+ }
+
+ synchronized KafkaProducer<byte[],byte[]> client() {
+ if (producer == null)
+ producer = new KafkaProducer<byte[],byte[]>(configFn.get(),
+ new ByteArraySerializer(), new ByteArraySerializer());
+ return producer;
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ if (producer != null)
+ producer.close();
+ }
+
+ String id() {
+ if (id == null) {
+ // include our short object Id
+ id = "Kafka " + toString().substring(toString().indexOf('@') + 1);
+ }
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaPublisher.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaPublisher.java
new file mode 100644
index 0000000..8d08262
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaPublisher.java
@@ -0,0 +1,85 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.kafka.runtime;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+
+/**
+ * A publisher of Kafka key/value records from a stream of tuples of type {@code T}
+ *
+ * @param <T> tuple type
+ */
+public class KafkaPublisher<T> implements Consumer<T>, AutoCloseable {
+ private static final long serialVersionUID = 1L;
+ private static final Logger trace = KafkaProducerConnector.getTrace();
+ private final Function<T, byte[]> keyFn;
+ private final Function<T, byte[]> valueFn;
+ private final Function<T, String> topicFn;
+ private final Function<T, Integer> partitionFn;
+ private String id;
+ private KafkaProducerConnector connector;
+
+ public KafkaPublisher(KafkaProducerConnector connector, Function<T, byte[]> keyFn,
+ Function<T, byte[]> valueFn, Function<T, String> topicFn,
+ Function<T, Integer> partitionFn) {
+
+ this.connector = connector;
+ if (keyFn == null)
+ keyFn = tuple -> null;
+ this.keyFn = keyFn;
+ this.valueFn = valueFn;
+ this.topicFn = topicFn;
+ if (partitionFn == null)
+ partitionFn = tuple -> null;
+ this.partitionFn = partitionFn;
+ }
+
+ @Override
+ public void accept(T t) {
+ String topic = topicFn.apply(t);
+ Integer partition = partitionFn.apply(t);
+ byte[] key = keyFn.apply(t);
+ byte[] value = valueFn.apply(t);
+ ProducerRecord<byte[],byte[]> rec = new ProducerRecord<>(
+ topic, partition, key, value);
+
+ trace.trace("{} sending rec to topic:{} partition:{}", id(), topic, partition);
+
+ // TODO add callback for trace of actual completion?
+
+ connector.client().send(rec); // async; doesn't throw
+ }
+
+ @Override
+ public void close() throws Exception {
+ connector.close();
+ }
+
+ private String id() {
+ if (id == null) {
+ // include our short object Id
+ id = connector.id() + " PUB " + toString().substring(toString().indexOf('@') + 1);
+ }
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaSubscriber.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaSubscriber.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaSubscriber.java
new file mode 100644
index 0000000..0a652e7
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaSubscriber.java
@@ -0,0 +1,187 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.kafka.runtime;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.slf4j.Logger;
+
+import org.apache.edgent.connectors.kafka.KafkaConsumer;
+import kafka.message.MessageAndMetadata;
+
+/**
+ * A consumer of Kafka key/value records that generates tuples of type {@code T}
+ *
+ * @param <T> tuple type
+ */
+public class KafkaSubscriber<T> implements Consumer<Consumer<T>>, AutoCloseable {
+ private static final long serialVersionUID = 1L;
+ private static final Logger trace = KafkaConsumerConnector.getTrace();
+ private String id;
+ private final KafkaConsumerConnector connector;
+ private Function<ByteConsumerRecord, T> byteToTupleFn;
+ private Function<StringConsumerRecord, T> stringToTupleFn;
+ private final String[] topics;
+ private Consumer<T> eventSubmitter;
+
+ @SuppressWarnings("unchecked")
+ public KafkaSubscriber(KafkaConsumerConnector connector, Function<? extends KafkaConsumer.ConsumerRecord<?,?>, T> toTupleFn, boolean isStringFn, String... topics) {
+ this.connector = connector;
+ if (isStringFn)
+ stringToTupleFn = (Function<StringConsumerRecord, T>) toTupleFn;
+ else
+ byteToTupleFn = (Function<ByteConsumerRecord, T>) toTupleFn;
+ this.topics = topics;
+ connector.addSubscriber(this, this.topics);
+ }
+
+ // The explicit topicPartition style of subscription is part of the
+ // Kafka's new KafkaConsumer API and is unbaked as of 8.2.2
+// @SuppressWarnings("unchecked")
+// public KafkaSubscriber(KafkaConsumerConnector connector, Function<? extends KafkaConsumer.ConsumerRecord<?,?>,T> toTupleFn, boolean isStringFn, KafkaConsumer.TopicPartition... topicPartitions) {
+// this.connector = connector;
+// if (isStringFn)
+// stringToTupleFn = (Function<KafkaSubscriber<T>.StringConsumerRecord, T>) toTupleFn;
+// else
+// byteToTupleFn = (Function<KafkaSubscriber<T>.ByteConsumerRecord, T>) toTupleFn;
+// this.topics = null;
+// TopicPartition[] tps = new TopicPartition[topicPartitions.length];
+// int i = 0;
+// for (KafkaConsumer.TopicPartition tp : topicPartitions)
+// tps[i++] = new TopicPartition(tp.topic(), tp.partition());
+// this.topicPartitions = tps;
+// connector.addSubscriber(this, this.topicPartitions);
+// }
+
+ @Override
+ public void accept(Consumer<T> eventSubmitter) {
+ try {
+ this.eventSubmitter = eventSubmitter;
+ connector.start(this);
+ }
+ catch (Throwable t) {
+ trace.error("{} initialization failure", id(), t);
+ }
+ }
+
+ // unbaked 8.2.2 KafkaConsumer
+// void accept(ConsumerRecord<byte[],byte[]> rec) {
+// if (rec.error()) {
+// trace.error("{} error retrieving record.", id(), rec.error());
+// return;
+// }
+// try {
+// T tuple;
+// if (stringToTupleFn != null)
+// tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
+// else
+// tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
+// eventSubmitter.accept(tuple);
+// }
+// catch (Exception e) {
+// trace.error("{} failure processing record from {}", id(), rec.topicAndPartition(), e);
+// }
+// }
+
+ void accept(MessageAndMetadata<byte[],byte[]> rec) {
+ try {
+ trace.trace("{} received rec for topic:{} partition:{} offset:{}",
+ id(), rec.topic(), rec.partition(), rec.offset());
+ T tuple;
+ if (stringToTupleFn != null)
+ tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
+ else
+ tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
+ eventSubmitter.accept(tuple);
+ }
+ catch (Exception e) {
+ String tp = String.format("[%s,%d]", rec.topic(), rec.partition());
+ trace.error("{} failure processing record from {}", id(), tp, e);
+ }
+ }
+
+ private String id() {
+ if (id == null) {
+ // include our short object Id
+ id = connector.id() + " SUB " + toString().substring(toString().indexOf('@') + 1);
+ }
+ return id;
+ }
+
+ @Override
+ public void close() throws Exception {
+ connector.close(this);
+ }
+
+ private static abstract class ConsumerRecordBase<K,V>
+ implements KafkaConsumer.ConsumerRecord<K,V> {
+ protected final MessageAndMetadata<byte[], byte[]> rec;
+
+ ConsumerRecordBase(MessageAndMetadata<byte[], byte[]> rec) {
+ this.rec = rec;
+ }
+
+ public abstract K key();
+ public abstract V value();
+
+ @Override
+ public String topic() { return rec.topic(); };
+ @Override
+ public int partition() { return rec.partition(); }
+ @Override
+ public long offset() { return rec.offset(); }
+ }
+
+ private static class ByteConsumerRecord extends ConsumerRecordBase<byte[],byte[]>
+ implements KafkaConsumer.ByteConsumerRecord {
+
+ ByteConsumerRecord(MessageAndMetadata<byte[], byte[]> rec) {
+ super(rec);
+ }
+
+ @Override
+ public byte[] key() { return rec.key(); }
+ @Override
+ public byte[] value() { return rec.message(); }
+ }
+
+ private static class StringConsumerRecord extends ConsumerRecordBase<String,String>
+ implements KafkaConsumer.StringConsumerRecord {
+
+ StringConsumerRecord(MessageAndMetadata<byte[], byte[]> rec) {
+ super(rec);
+ }
+
+ @Override
+ public String key() {
+ byte[] key = rec.key();
+ if (key == null)
+ return null;
+ return new String(key, StandardCharsets.UTF_8);
+ }
+ @Override
+ public String value() {
+ return new String(rec.message(), StandardCharsets.UTF_8);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java b/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
deleted file mode 100644
index 9edb0a2..0000000
--- a/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.kafka;
-
-/**
- * KafkaStreams connector globalization tests.
- */
-public class KafkaStreamsGlobalTestManual extends KafkaStreamsTestManual {
- private static final String globalMsg1 = "\u4f60\u597d";
- private static final String globalMsg2 = "\u4f60\u5728\u55ce";
-
- public String getMsg1() {
- return globalMsg1;
- }
-
- public String getMsg2() {
- return globalMsg2;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsTestManual.java b/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
deleted file mode 100644
index 8f92fec..0000000
--- a/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.kafka;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-
-import edgent.test.connectors.common.ConnectorTestBase;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-import edgent.connectors.kafka.KafkaConsumer;
-import edgent.connectors.kafka.KafkaProducer;
-
-public class KafkaStreamsTestManual extends ConnectorTestBase {
- private static final int PUB_DELAY_MSEC = 4*1000;
- private static final int SEC_TIMEOUT = 10;
- private final String BASE_GROUP_ID = "kafkaStreamsTestGroupId";
- private final String uniq = simpleTS();
- private final String msg1 = "Hello";
- private final String msg2 = "Are you there?";
-
- public String getMsg1() {
- return msg1;
- }
-
- public String getMsg2() {
- return msg2;
- }
-
- private String[] getKafkaTopics() {
- String csvTopics = System.getProperty("edgent.test.connectors.kafka.csvTopics", "testTopic1,testTopic2");
- String[] topics = csvTopics.split(",");
- return topics;
- }
-
- private String getKafkaBootstrapServers() {
- return System.getProperty("edgent.test.connectors.kafka.bootstrap.servers", "localhost:9092");
- }
-
- private String getKafkaZookeeperConnect() {
- return System.getProperty("edgent.test.connectors.kafka.zookeeper.connect", "localhost:2181");
- }
-
- private String newGroupId(String name) {
- String groupId = BASE_GROUP_ID + "_" + name + "_" + uniq.replaceAll(":", "");
- System.out.println("["+simpleTS()+"] "
- + "Using Kafka consumer group.id " + groupId);
- return groupId;
- }
-
- private Map<String,Object> newConsumerConfig(String groupId) {
- Map<String,Object> config = new HashMap<>();
- // unbaked 8.8.2 KafkaConsumer
-// config.put("bootstrap.servers", getKafkaBootstrapServers());
- config.put("zookeeper.connect", getKafkaZookeeperConnect());
- config.put("group.id", groupId);
- return config;
- }
-
- private Map<String,Object> newProducerConfig() {
- Map<String,Object> config = new HashMap<>();
- config.put("bootstrap.servers", getKafkaBootstrapServers());
- return config;
- }
-
- private static class Rec {
- String topic;
- int partition;
- String key;
- String value;
- Rec(String topic, int partition, String key, String value) {
- this.topic = topic;
- this.key = key;
- this.value = value;
- }
- public String toString() {
- return "topic:"+topic+" partition:"+partition+" key:"+key+" value:"+value;
- }
- }
-
- @Test
- public void testSimple() throws Exception {
- Topology t = newTopology("testSimple");
- MsgGenerator mgen = new MsgGenerator(t.getName());
- String topic = getKafkaTopics()[0];
- String groupId = newGroupId(t.getName());
- List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
-
- TStream<String> s = PlumbingStreams.blockingOneShotDelay(
- t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-
- Map<String,Object> pConfig = newProducerConfig();
- KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
-
- TSink<String> sink = producer.publish(s, topic);
-
- Map<String,Object> cConfig = newConsumerConfig(groupId);
- KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-
- TStream<String> rcvd = consumer.subscribe(
- rec -> rec.value(),
- topic);
-
- completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
-
- assertNotNull(sink);
- }
-
- @Test
- public void testWithKey() throws Exception {
- Topology t = newTopology("testWithKey");
- MsgGenerator mgen = new MsgGenerator(t.getName());
- String topic = getKafkaTopics()[0];
- String groupId = newGroupId(t.getName());
- List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
- List<Rec> recs = new ArrayList<>();
- int i = 0;
- for (String msg : msgs) {
- recs.add(new Rec(topic, 0, "key-" + ++i, msg));
- }
- List<String> expected = new ArrayList<>();
- for (Rec rec : recs) {
- expected.add(rec.toString());
- }
-
- // Test publish with key
- // Also exercise ConsumerRecord accessors
-
- TStream<Rec> s = PlumbingStreams.blockingOneShotDelay(
- t.collection(recs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-
- Map<String,Object> pConfig = newProducerConfig();
- KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
-
- producer.publish(s,
- tuple -> tuple.key,
- tuple -> tuple.value,
- tuple -> tuple.topic,
- tuple -> tuple.partition);
-
- Map<String,Object> cConfig = newConsumerConfig(groupId);
- KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-
- TStream<String> rcvd = consumer.subscribe(
- rec -> new Rec(rec.topic(),
- rec.partition(),
- rec.key(),
- rec.value()).toString(),
- topic);
-
- completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, expected.toArray(new String[0]));
- }
-
- @Test
- public void testPubSubBytes() throws Exception {
- Topology t = newTopology("testPubSubBytes");
- MsgGenerator mgen = new MsgGenerator(t.getName());
- String topic = getKafkaTopics()[0];
- String groupId = newGroupId(t.getName());
- List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
- List<Rec> recs = new ArrayList<>();
- int i = 0;
- for (String msg : msgs) {
- recs.add(new Rec(topic, 0, "key-" + ++i, msg));
- }
- List<String> expected = new ArrayList<>();
- for (Rec rec : recs) {
- expected.add(rec.toString());
- }
-
- // Test publishBytes() / subscribeBytes()
-
- TStream<Rec> s = PlumbingStreams.blockingOneShotDelay(
- t.collection(recs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-
- Map<String,Object> pConfig = newProducerConfig();
- KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
-
- producer.publishBytes(s,
- tuple -> tuple.key.getBytes(StandardCharsets.UTF_8),
- tuple -> tuple.value.getBytes(StandardCharsets.UTF_8),
- tuple -> tuple.topic,
- tuple -> tuple.partition);
-
- Map<String,Object> cConfig = newConsumerConfig(groupId);
- KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-
- TStream<String> rcvd = consumer.subscribeBytes(
- rec -> new Rec(rec.topic(),
- rec.partition(),
- new String(rec.key(), StandardCharsets.UTF_8),
- new String(rec.value(), StandardCharsets.UTF_8)).toString(),
- topic);
-
- completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, expected.toArray(new String[0]));
- }
-
- @Test
- public void testMultiPub() throws Exception {
- Topology t = newTopology("testMultiPub");
- MsgGenerator mgen = new MsgGenerator(t.getName());
- String topic1 = getKafkaTopics()[0];
- String topic2 = getKafkaTopics()[1];
- String groupId = newGroupId(t.getName());
- List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
- List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
- List<String> msgs = new ArrayList<>(msgs1);
- msgs.addAll(msgs2);
-
- // Multiple publish() on a single connection.
- // Also multi-topic subscribe().
-
- TStream<String> s1 = PlumbingStreams.blockingOneShotDelay(
- t.collection(msgs1), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-
- TStream<String> s2 = PlumbingStreams.blockingOneShotDelay(
- t.collection(msgs2), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-
- Map<String,Object> pConfig = newProducerConfig();
- KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
-
- TSink<String> sink1 = producer.publish(s1, topic1);
- TSink<String> sink2 = producer.publish(s2, topic2);
-
- Map<String,Object> cConfig = newConsumerConfig(groupId);
- KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-
- TStream<String> rcvd = consumer.subscribe(
- rec -> rec.value(),
- topic1, topic2);
-
- completeAndValidate(false/*ordered*/, "", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
-
- assertNotNull(sink1);
- assertNotNull(sink2);
- assertNotSame(sink1, sink2);
- }
-
- @Test(expected=IllegalStateException.class)
- public void testMultiSubNeg() throws Exception {
- Topology t = newTopology("testMultiSubNeg");
- MsgGenerator mgen = new MsgGenerator(t.getName());
- String topic1 = getKafkaTopics()[0];
- String topic2 = getKafkaTopics()[1];
- String groupId = newGroupId(t.getName());
- List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
- List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
-
- // Multiple subscribe() on a single connection.
- // Currently, w/Kafka0.8.2.2, we only support a single
- // subscriber on the connection and an IllegalStateException
- // is thrown.
- // This restriction will be removed when we migrate to Kafka 0.9.0.0
-
- TStream<String> s1 = PlumbingStreams.blockingOneShotDelay(
- t.collection(msgs1), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-
- TStream<String> s2 = PlumbingStreams.blockingOneShotDelay(
- t.collection(msgs2), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-
- Map<String,Object> pConfig = newProducerConfig();
- KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
-
- producer.publish(s1, topic1);
- producer.publish(s2, topic2);
-
- Map<String,Object> cConfig = newConsumerConfig(groupId);
- KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-
- @SuppressWarnings("unused")
- TStream<String> rcvd1 = consumer.subscribe(
- rec -> rec.value(),
- topic1);
-
- @SuppressWarnings("unused")
- TStream<String> rcvd2 = consumer.subscribe(
- rec -> rec.value(),
- topic2);
-
- // TODO see "single subscribe" restriction above
-
-// // TODO union() is NYI
-//// TStream<String> rcvd = rcvd1.union(rcvd2);
-////
-//// completeAndValidate(false/*ordered*/, "", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
-//
-// Condition<Long> tc1 = t.getTester().tupleCount(rcvd1, msgs1.size());
-// Condition<Long> tc2 = t.getTester().tupleCount(rcvd2, msgs2.size());
-//
-// List<Condition<Long>> conditions = new ArrayList<>();
-// conditions.add(tc1);
-// conditions.add(tc2);
-// Condition<?> tc = tc1.and(tc2);
-//
-// Condition<List<String>> contents1 = t.getTester().streamContents(rcvd1, msgs1.toArray(new String[0]));
-// Condition<List<String>> contents2 = t.getTester().streamContents(rcvd2, msgs2.toArray(new String[0]));
-//
-// complete(t, tc, SEC_TIMEOUT, TimeUnit.SECONDS);
-//
-// assertTrue(groupId + " contents1:" + contents1.getResult(), contents1.valid());
-// assertTrue(groupId + " contents2:" + contents2.getResult(), contents2.valid());
-// assertTrue("valid:" + tc, tc.valid());
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void testNoTopicSubNeg() throws Exception {
- Topology t = newTopology("testNoTopicSubNeg");
- String groupId = newGroupId(t.getName());
-
- Map<String,Object> cConfig = newConsumerConfig(groupId);
- KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-
- consumer.subscribe(rec -> rec.value()/*, "topic1"*/);
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void testDupTopicSub1Neg() throws Exception {
- Topology t = newTopology("testDupTopicSub1Neg");
- String groupId = newGroupId(t.getName());
-
- Map<String,Object> cConfig = newConsumerConfig(groupId);
- KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-
- consumer.subscribe(rec -> rec.value(), "topic1", "topic1");
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void testDupTopicSub2Neg() throws Exception {
- Topology t = newTopology("testDupTopicSub2Neg");
- String groupId = newGroupId(t.getName());
-
- Map<String,Object> cConfig = newConsumerConfig(groupId);
- KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-
- consumer.subscribe(rec -> rec.value(), "topic1");
- consumer.subscribe(rec -> rec.value(), "topic1");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
new file mode 100644
index 0000000..3c45b68
--- /dev/null
+++ b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
@@ -0,0 +1,36 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.kafka;
+
+/**
+ * KafkaStreams connector globalization tests.
+ */
+public class KafkaStreamsGlobalTestManual extends KafkaStreamsTestManual {
+ private static final String globalMsg1 = "\u4f60\u597d";
+ private static final String globalMsg2 = "\u4f60\u5728\u55ce";
+
+ public String getMsg1() {
+ return globalMsg1;
+ }
+
+ public String getMsg2() {
+ return globalMsg2;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
new file mode 100644
index 0000000..4328af8
--- /dev/null
+++ b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
@@ -0,0 +1,365 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.kafka;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.connectors.kafka.KafkaConsumer;
+import org.apache.edgent.connectors.kafka.KafkaProducer;
+import org.apache.edgent.test.connectors.common.ConnectorTestBase;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+import org.junit.Test;
+
+public class KafkaStreamsTestManual extends ConnectorTestBase {
+ private static final int PUB_DELAY_MSEC = 4*1000;
+ private static final int SEC_TIMEOUT = 10;
+ private final String BASE_GROUP_ID = "kafkaStreamsTestGroupId";
+ private final String uniq = simpleTS();
+ private final String msg1 = "Hello";
+ private final String msg2 = "Are you there?";
+
+ public String getMsg1() {
+ return msg1;
+ }
+
+ public String getMsg2() {
+ return msg2;
+ }
+
+ private String[] getKafkaTopics() {
+ String csvTopics = System.getProperty("org.apache.edgent.test.connectors.kafka.csvTopics", "testTopic1,testTopic2");
+ String[] topics = csvTopics.split(",");
+ return topics;
+ }
+
+ private String getKafkaBootstrapServers() {
+ return System.getProperty("org.apache.edgent.test.connectors.kafka.bootstrap.servers", "localhost:9092");
+ }
+
+ private String getKafkaZookeeperConnect() {
+ return System.getProperty("org.apache.edgent.test.connectors.kafka.zookeeper.connect", "localhost:2181");
+ }
+
+ private String newGroupId(String name) {
+ String groupId = BASE_GROUP_ID + "_" + name + "_" + uniq.replaceAll(":", "");
+ System.out.println("["+simpleTS()+"] "
+ + "Using Kafka consumer group.id " + groupId);
+ return groupId;
+ }
+
+ private Map<String,Object> newConsumerConfig(String groupId) {
+ Map<String,Object> config = new HashMap<>();
+ // unbaked 8.8.2 KafkaConsumer
+// config.put("bootstrap.servers", getKafkaBootstrapServers());
+ config.put("zookeeper.connect", getKafkaZookeeperConnect());
+ config.put("group.id", groupId);
+ return config;
+ }
+
+ private Map<String,Object> newProducerConfig() {
+ Map<String,Object> config = new HashMap<>();
+ config.put("bootstrap.servers", getKafkaBootstrapServers());
+ return config;
+ }
+
+ private static class Rec {
+ String topic;
+ int partition;
+ String key;
+ String value;
+ Rec(String topic, int partition, String key, String value) {
+ this.topic = topic;
+ this.key = key;
+ this.value = value;
+ }
+ public String toString() {
+ return "topic:"+topic+" partition:"+partition+" key:"+key+" value:"+value;
+ }
+ }
+
+ @Test
+ public void testSimple() throws Exception {
+ Topology t = newTopology("testSimple");
+ MsgGenerator mgen = new MsgGenerator(t.getName());
+ String topic = getKafkaTopics()[0];
+ String groupId = newGroupId(t.getName());
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
+
+ TStream<String> s = PlumbingStreams.blockingOneShotDelay(
+ t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+
+ Map<String,Object> pConfig = newProducerConfig();
+ KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
+
+ TSink<String> sink = producer.publish(s, topic);
+
+ Map<String,Object> cConfig = newConsumerConfig(groupId);
+ KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+
+ TStream<String> rcvd = consumer.subscribe(
+ rec -> rec.value(),
+ topic);
+
+ completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
+
+ assertNotNull(sink);
+ }
+
+ @Test
+ public void testWithKey() throws Exception {
+ Topology t = newTopology("testWithKey");
+ MsgGenerator mgen = new MsgGenerator(t.getName());
+ String topic = getKafkaTopics()[0];
+ String groupId = newGroupId(t.getName());
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
+ List<Rec> recs = new ArrayList<>();
+ int i = 0;
+ for (String msg : msgs) {
+ recs.add(new Rec(topic, 0, "key-" + ++i, msg));
+ }
+ List<String> expected = new ArrayList<>();
+ for (Rec rec : recs) {
+ expected.add(rec.toString());
+ }
+
+ // Test publish with key
+ // Also exercise ConsumerRecord accessors
+
+ TStream<Rec> s = PlumbingStreams.blockingOneShotDelay(
+ t.collection(recs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+
+ Map<String,Object> pConfig = newProducerConfig();
+ KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
+
+ producer.publish(s,
+ tuple -> tuple.key,
+ tuple -> tuple.value,
+ tuple -> tuple.topic,
+ tuple -> tuple.partition);
+
+ Map<String,Object> cConfig = newConsumerConfig(groupId);
+ KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+
+ TStream<String> rcvd = consumer.subscribe(
+ rec -> new Rec(rec.topic(),
+ rec.partition(),
+ rec.key(),
+ rec.value()).toString(),
+ topic);
+
+ completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, expected.toArray(new String[0]));
+ }
+
+ @Test
+ public void testPubSubBytes() throws Exception {
+ Topology t = newTopology("testPubSubBytes");
+ MsgGenerator mgen = new MsgGenerator(t.getName());
+ String topic = getKafkaTopics()[0];
+ String groupId = newGroupId(t.getName());
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
+ List<Rec> recs = new ArrayList<>();
+ int i = 0;
+ for (String msg : msgs) {
+ recs.add(new Rec(topic, 0, "key-" + ++i, msg));
+ }
+ List<String> expected = new ArrayList<>();
+ for (Rec rec : recs) {
+ expected.add(rec.toString());
+ }
+
+ // Test publishBytes() / subscribeBytes()
+
+ TStream<Rec> s = PlumbingStreams.blockingOneShotDelay(
+ t.collection(recs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+
+ Map<String,Object> pConfig = newProducerConfig();
+ KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
+
+ producer.publishBytes(s,
+ tuple -> tuple.key.getBytes(StandardCharsets.UTF_8),
+ tuple -> tuple.value.getBytes(StandardCharsets.UTF_8),
+ tuple -> tuple.topic,
+ tuple -> tuple.partition);
+
+ Map<String,Object> cConfig = newConsumerConfig(groupId);
+ KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+
+ TStream<String> rcvd = consumer.subscribeBytes(
+ rec -> new Rec(rec.topic(),
+ rec.partition(),
+ new String(rec.key(), StandardCharsets.UTF_8),
+ new String(rec.value(), StandardCharsets.UTF_8)).toString(),
+ topic);
+
+ completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, expected.toArray(new String[0]));
+ }
+
+ @Test
+ public void testMultiPub() throws Exception {
+ Topology t = newTopology("testMultiPub");
+ MsgGenerator mgen = new MsgGenerator(t.getName());
+ String topic1 = getKafkaTopics()[0];
+ String topic2 = getKafkaTopics()[1];
+ String groupId = newGroupId(t.getName());
+ List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
+ List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
+ List<String> msgs = new ArrayList<>(msgs1);
+ msgs.addAll(msgs2);
+
+ // Multiple publish() on a single connection.
+ // Also multi-topic subscribe().
+
+ TStream<String> s1 = PlumbingStreams.blockingOneShotDelay(
+ t.collection(msgs1), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+
+ TStream<String> s2 = PlumbingStreams.blockingOneShotDelay(
+ t.collection(msgs2), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+
+ Map<String,Object> pConfig = newProducerConfig();
+ KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
+
+ TSink<String> sink1 = producer.publish(s1, topic1);
+ TSink<String> sink2 = producer.publish(s2, topic2);
+
+ Map<String,Object> cConfig = newConsumerConfig(groupId);
+ KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+
+ TStream<String> rcvd = consumer.subscribe(
+ rec -> rec.value(),
+ topic1, topic2);
+
+ completeAndValidate(false/*ordered*/, "", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
+
+ assertNotNull(sink1);
+ assertNotNull(sink2);
+ assertNotSame(sink1, sink2);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testMultiSubNeg() throws Exception {
+ Topology t = newTopology("testMultiSubNeg");
+ MsgGenerator mgen = new MsgGenerator(t.getName());
+ String topic1 = getKafkaTopics()[0];
+ String topic2 = getKafkaTopics()[1];
+ String groupId = newGroupId(t.getName());
+ List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
+ List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
+
+ // Multiple subscribe() on a single connection.
+ // Currently, w/Kafka0.8.2.2, we only support a single
+ // subscriber on the connection and an IllegalStateException
+ // is thrown.
+ // This restriction will be removed when we migrate to Kafka 0.9.0.0
+
+ TStream<String> s1 = PlumbingStreams.blockingOneShotDelay(
+ t.collection(msgs1), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+
+ TStream<String> s2 = PlumbingStreams.blockingOneShotDelay(
+ t.collection(msgs2), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+
+ Map<String,Object> pConfig = newProducerConfig();
+ KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
+
+ producer.publish(s1, topic1);
+ producer.publish(s2, topic2);
+
+ Map<String,Object> cConfig = newConsumerConfig(groupId);
+ KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+
+ @SuppressWarnings("unused")
+ TStream<String> rcvd1 = consumer.subscribe(
+ rec -> rec.value(),
+ topic1);
+
+ @SuppressWarnings("unused")
+ TStream<String> rcvd2 = consumer.subscribe(
+ rec -> rec.value(),
+ topic2);
+
+ // TODO see "single subscribe" restriction above
+
+// // TODO union() is NYI
+//// TStream<String> rcvd = rcvd1.union(rcvd2);
+////
+//// completeAndValidate(false/*ordered*/, "", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
+//
+// Condition<Long> tc1 = t.getTester().tupleCount(rcvd1, msgs1.size());
+// Condition<Long> tc2 = t.getTester().tupleCount(rcvd2, msgs2.size());
+//
+// List<Condition<Long>> conditions = new ArrayList<>();
+// conditions.add(tc1);
+// conditions.add(tc2);
+// Condition<?> tc = tc1.and(tc2);
+//
+// Condition<List<String>> contents1 = t.getTester().streamContents(rcvd1, msgs1.toArray(new String[0]));
+// Condition<List<String>> contents2 = t.getTester().streamContents(rcvd2, msgs2.toArray(new String[0]));
+//
+// complete(t, tc, SEC_TIMEOUT, TimeUnit.SECONDS);
+//
+// assertTrue(groupId + " contents1:" + contents1.getResult(), contents1.valid());
+// assertTrue(groupId + " contents2:" + contents2.getResult(), contents2.valid());
+// assertTrue("valid:" + tc, tc.valid());
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testNoTopicSubNeg() throws Exception {
+ Topology t = newTopology("testNoTopicSubNeg");
+ String groupId = newGroupId(t.getName());
+
+ Map<String,Object> cConfig = newConsumerConfig(groupId);
+ KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+
+ consumer.subscribe(rec -> rec.value()/*, "topic1"*/);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testDupTopicSub1Neg() throws Exception {
+ Topology t = newTopology("testDupTopicSub1Neg");
+ String groupId = newGroupId(t.getName());
+
+ Map<String,Object> cConfig = newConsumerConfig(groupId);
+ KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+
+ consumer.subscribe(rec -> rec.value(), "topic1", "topic1");
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testDupTopicSub2Neg() throws Exception {
+ Topology t = newTopology("testDupTopicSub2Neg");
+ String groupId = newGroupId(t.getName());
+
+ Map<String,Object> cConfig = newConsumerConfig(groupId);
+ KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+
+ consumer.subscribe(rec -> rec.value(), "topic1");
+ consumer.subscribe(rec -> rec.value(), "topic1");
+ }
+
+}