You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:24 UTC

[23/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/KafkaProducer.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/KafkaProducer.java
new file mode 100644
index 0000000..103d61a
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/KafkaProducer.java
@@ -0,0 +1,168 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.kafka;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import org.apache.edgent.connectors.kafka.runtime.KafkaProducerConnector;
+import org.apache.edgent.connectors.kafka.runtime.KafkaPublisher;
+
+/**
+ * {@code KafkaProducer} is a connector for publishing a stream of tuples
+ * to Apache Kafka messaging system topics.
+ * <p>
+ * The connector uses and includes components from the Kafka 0.8.2.2 release.
+ * It has been successfully tested against a kafka_2.11-0.9.0.0 server as well.
+ * For more information about Kafka see
+ * <a href="http://kafka.apache.org">http://kafka.apache.org</a>
+ * <p>
+ * Sample use:
+ * <pre>{@code
+ * String bootstrapServers = "localhost:9092";
+ * String topic = "mySensorReadingsTopic";
+ * 
+ * Map<String,Object> config = new HashMap<>();
+ * config.put("bootstrap.servers", bootstrapServers);
+ * 
+ * Topology t = ...
+ * KafkaProducer kafka = new KafkaProducer(t, () -> config);
+ * 
+ * TStream<JsonObject> sensorReadings = t.poll(
+ *              () -> getSensorReading(), 5, TimeUnit.SECONDS);
+ *              
+ * // publish as sensor readings as JSON
+ * kafka.publish(sensonReadings, tuple -> tuple.toString(), topic);
+ * }</pre>
+ */
+public class KafkaProducer {
+    @SuppressWarnings("unused")
+    private final Topology t;
+    private final KafkaProducerConnector connector;
+   
+    /**
+     * Create a producer connector for publishing tuples to Kafka topics.s
+     * <p>
+     * See the Apache Kafka documentation for {@code KafkaProducer}
+     * configuration properties at <a href="http://kafka.apache.org">http://kafka.apache.org</a>.
+     * Configuration property values are strings.
+     * <p>
+     * The Kafka "New Producer configs" are used.  Minimal configuration
+     * typically includes:
+     * <ul>
+     * <li><code>bootstrap.servers</code></li>
+     * </ul>
+     * <p>
+     * The full set of producer configuration items are specified in
+     * {@code org.apache.kafka.clients.producer.ProducerConfig}
+     * 
+     * @param t Topology to add to
+     * @param config KafkaProducer configuration information.
+     */
+    public KafkaProducer(Topology t, Supplier<Map<String,Object>> config) {
+        this.t = t;
+        connector = new KafkaProducerConnector(config);
+    }
+
+    /**
+     * Publish the stream of tuples as Kafka key/value records
+     * to the specified topic partitions.
+     * <p>
+     * If a valid partition number is specified that partition will be used
+     * when sending the message.  If no partition is specified but a key is
+     * present a partition will be chosen using a hash of the key.
+     * If neither key nor partition is present a partition will be assigned
+     * in a round-robin fashion.
+     * 
+     * @param <T> Tuple type
+     * @param stream the stream to publish
+     * @param keyFn A function that yields an optional byte[] 
+     *        Kafka record's key from the tuple.
+     *        Specify null or return a null value for no key.
+     * @param valueFn A function that yields the byte[]
+     *        Kafka record's value from the tuple.
+     * @param topicFn A function that yields the topic from the tuple.
+     * @param partitionFn A function that yields the optional topic
+     *        partition specification from the tuple.
+     *        Specify null or return a null value for no partition specification.
+     * @return {@link TSink}
+     */
+    public <T> TSink<T> publishBytes(TStream<T> stream, Function<T,byte[]> keyFn, Function<T,byte[]> valueFn, Function<T,String> topicFn, Function<T,Integer> partitionFn) {
+        return stream.sink(new KafkaPublisher<T>(connector, keyFn, valueFn, topicFn, partitionFn));
+    }
+    
+    /**
+     * Publish the stream of tuples as Kafka key/value records
+     * to the specified partitions of the specified topics.
+     * <p>
+     * This is a convenience method for {@code String} typed key/value
+     * conversion functions.
+     * <p>
+     * @param <T> Tuple type
+     * @param stream the stream to publish
+     * @param keyFn A function that yields an optional String 
+     *        Kafka record's key from the tuple.
+     *        Specify null or return a null value for no key.
+     * @param valueFn A function that yields the String for the
+     *        Kafka record's value from the tuple.
+     * @param topicFn A function that yields the topic from the tuple.
+     * @param partitionFn A function that yields the optional topic
+     *        partition specification from the tuple.
+     *        Specify null or return a null value for no partition specification.
+     * @return {@link TSink}
+     * @see #publishBytes(TStream, Function, Function, Function, Function)
+     */
+    public <T> TSink<T> publish(TStream<T> stream, Function<T,String> keyFn, Function<T,String> valueFn, Function<T,String> topicFn, Function<T,Integer> partitionFn) {
+        Function<T,byte[]> keyFn2 = null;
+        if (keyFn != null) {
+            keyFn2 = tuple -> { String key = keyFn.apply(tuple);
+                                return key==null
+                                        ? null
+                                        : key.getBytes(StandardCharsets.UTF_8);
+                              };
+        }
+        return publishBytes(stream, keyFn2,
+                tuple -> valueFn.apply(tuple).getBytes(StandardCharsets.UTF_8), 
+                topicFn, partitionFn);
+    }
+
+    /**
+     * Publish the stream of tuples as Kafka key/value records
+     * to the specified partitions of the specified topics.
+     * <p>
+     * This is a convenience method for a String stream published
+     * as a Kafka record with no key and
+     * a value consisting of the String tuple serialized as UTF-8,
+     * and publishing round-robin to a fixed topic's partitions.
+     * 
+     * @param stream the stream to publish
+     * @param topic The topic to publish to
+     * @return {@link TSink}
+     * @see #publish(TStream, Function, Function, Function, Function)
+     */
+    public TSink<String> publish(TStream<String> stream, String topic) {
+        return publish(stream, null, tuple -> tuple, tuple -> topic, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/package-info.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/package-info.java
new file mode 100644
index 0000000..dd1689b
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/package-info.java
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * Apache Kafka enterprise messing hub stream connector.
+ * <p>
+ * The connector uses and includes components from the Kafka 0.8.2.2 release.
+ * It has been successfully tested against a kafka_2.11-0.9.0.0 server as well.
+ * <p>
+ * Stream tuples may be published to Kafka broker topics
+ * and created by subscribing to broker topics.
+ * For more information about Kafka see
+ * <a href="http://kafka.apache.org">http://kafka.apache.org</a>
+ */
+package org.apache.edgent.connectors.kafka;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConnector.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConnector.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConnector.java
new file mode 100644
index 0000000..0575b13
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConnector.java
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.kafka.runtime;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConnector implements Serializable {
+    private static final long serialVersionUID = 1L;
+    protected static final Logger trace = LoggerFactory.getLogger(KafkaConnector.class);
+    
+    static Logger getTrace() {
+        return trace;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
new file mode 100644
index 0000000..f5f879f
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
@@ -0,0 +1,263 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.kafka.runtime;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.function.Supplier;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+/**
+ * A connector for consuming Kafka key/value records.
+ */
+public class KafkaConsumerConnector extends KafkaConnector {
+    private static final long serialVersionUID = 1L;
+    private final Supplier<Map<String,Object>> configFn;
+    private String id;
+    // Map<subscriber,List<topic>>>
+    private final Map<KafkaSubscriber<?>,List<String>> subscribers = new HashMap<>();
+    private ConsumerConnector consumer;
+    private final Map<KafkaSubscriber<?>,ExecutorService> executors = new HashMap<>();
+    
+    // Ugh. Turns out the new KafkaConsumer present in 8.2.2 isn't baked yet
+    // (e.g., its poll() just return null).
+
+    public KafkaConsumerConnector(Supplier<Map<String, Object>> configFn) {
+        this.configFn = configFn;
+    }
+    
+    // unbaked 8.2.2 KafkaConsumer
+//    private synchronized KafkaConsumer<byte[],byte[]> client() {
+//        if (consumer == null)
+//            consumer = new KafkaConsumer<byte[],byte[]>(configFn.get(),
+//                    null, /*ConsumerRebalanceCallaback*/
+//                    new ByteArrayDeserializer(), new ByteArrayDeserializer());
+//        return consumer;
+//    }
+    
+    private synchronized ConsumerConnector client() {
+        if (consumer == null)
+            consumer = Consumer.createJavaConsumerConnector(
+                                                createConsumerConfig());
+        return consumer;
+    }
+    
+    private ConsumerConfig createConsumerConfig() {
+        Map<String,Object> config = configFn.get();
+        Properties props = new Properties();
+        for (Entry<String,Object> e : config.entrySet()) {
+            props.put(e.getKey(), e.getValue().toString());
+        }
+        return new ConsumerConfig(props);
+    }
+
+    public synchronized void close(KafkaSubscriber<?> subscriber) {
+        trace.trace("{} closing subscriber {}", id(), subscriber);
+        // TODO hmm... really want to do consumer.shutdown() first
+        // to avoid InterruptedException from shutdown[Now] of
+        // consumer threads (in it.next()).
+        // Our issue is that we can have multiple Subscriber for a
+        // single ConsumerConnection.
+        // Look at streams.messaging to see how it handles this - not
+        // sure it does (e.g., may have only a single operator for a
+        // ConsumerConnection).
+        try {
+            ExecutorService executor = executors.remove(subscriber);
+            if (executor != null) {
+                executor.shutdownNow();
+                executor.awaitTermination(5, TimeUnit.SECONDS);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        finally {
+            if (executors.isEmpty()) {
+                trace.info("{} closing consumer", id());
+                if (consumer != null)
+                    consumer.shutdown();
+            }
+        }
+    }
+    
+    synchronized void addSubscriber(KafkaSubscriber<?> subscriber, String... topics) {
+        List<String> topicList = new ArrayList<>(Arrays.asList(topics));
+        checkSubscription(subscriber, topicList);
+        {
+            // In Kafka 0.8.2.2, ConsumerConnector.createMessageStreams() can
+            // only be called once for a connector instance.
+            // The analogous operation in Kafka 0.9.0.0 doesn't have such a
+            // restriction so for now just enforce a restriction rather than
+            // do the work to make this appear to work.
+            if (!subscribers.isEmpty())
+                throw new IllegalStateException("The KafkaConsumer connection already has a subscriber");
+        }
+        subscribers.put(subscriber, topicList);
+    }
+    
+   // unbaked 8.2.2 KafkaConsumer
+//   synchronized void addSubscriber(KafkaSubscriber<?> subscriber, TopicPartition... topicPartitions) {
+//        checkSubscription(subscriber, (Object[]) topicPartitions);
+//        isTopicSubscriptions = false;
+//        for (TopicPartition topicPartition : topicPartitions) {
+//            trace.info("{} addSubscriber for {}", id(), topicPartition);
+//            subscribers.put(subscriber, topicPartition);
+//        }
+//    }
+    
+    private void checkSubscription(KafkaSubscriber<?> subscriber, List<String> topics) {
+        if (topics.size() == 0)
+            throw new IllegalArgumentException("Subscription specification is empty");
+        
+        // disallow dup subscriptions
+        Set<String> topicSet = new HashSet<>(topics);
+        if (topicSet.size() != topics.size())
+            throw new IllegalArgumentException("Duplicate subscription");
+        
+        // check against existing subscriptions
+        topicSet.clear();
+        for (List<String> l : subscribers.values())
+            topicSet.addAll(l);
+        for (String topic : topics) {
+            if (topicSet.contains(topic))
+                throw new IllegalArgumentException("Duplicate subscription");
+        }
+    }
+    
+    synchronized void start(KafkaSubscriber<?> subscriber) {
+        Map<String,Integer> topicCountMap = new HashMap<>();
+        int threadsPerTopic = 1;
+        int totThreadCnt = 0;
+        List<String> topics = subscribers.get(subscriber);
+        for (String topic : topics) {
+            topicCountMap.put(topic,  threadsPerTopic);
+            totThreadCnt += threadsPerTopic;
+        }
+        
+        Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap =
+                client().createMessageStreams(topicCountMap);
+        
+        ExecutorService executor = Executors.newFixedThreadPool(totThreadCnt);
+        executors.put(subscriber, executor);
+        
+        for (Entry<String,List<KafkaStream<byte[],byte[]>>> entry : consumerMap.entrySet()) {
+            String topic = entry.getKey();
+            int threadNum = 0;
+            for (KafkaStream<byte[],byte[]> stream : entry.getValue()) {
+                executor.submit(() -> {
+                    try {
+                        trace.info("{} started consumer thread {} for topic:{}", id(), threadNum, topic);
+                        ConsumerIterator<byte[],byte[]> it = stream.iterator();
+                        while (it.hasNext()) {
+                            subscriber.accept(it.next());
+                        }
+                    }
+                    catch (Throwable t) {
+                        if (t instanceof InterruptedException) {
+                            // normal close() termination
+                            trace.trace("{} consumer for topic:{}. got exception", id(), topic, t);
+                        }
+                        else
+                            trace.error("{} consumer for topic:{}. got exception", id(), topic, t);
+                    }
+                    finally {
+                        trace.info("{} consumer thread {} for topic:{} exiting.", id(), threadNum, topic);
+                    }
+                });
+            }
+        }
+    }
+
+    // unbaked 8.2.2 KafkaConsumer
+//    synchronized void start(KafkaSubscriber<?> subscriber) {
+//        List<Object> subscriptions = subscribers.get(subscriber);
+//        trace.info("{} adding subscription for {}", id(), subscriptions);
+//        if (subscriptions.get(0) instanceof String)
+//            client().subscribe(subscriptions.toArray(new String[0]));
+//        else
+//            client().subscribe(subscriptions.toArray(new TopicPartition[0]));
+//        
+//        if (pollFuture == null) {
+//            pollFuture = executor.submit(new Runnable() {
+//                @Override
+//                public void run() {
+//                    KafkaConsumerConnector.this.run();
+//                }
+//            });
+//        }
+//    }
+//    
+//    private void run() {
+//        trace.info("{} poll thread running", id());
+//        while (true) {
+//            if (Thread.interrupted()) {
+//                trace.info("{} poll thread terinating", id());
+//                return;
+//            }
+//            
+//            fetchAndProcess();
+//        }
+//    }
+//    
+//    private void fetchAndProcess() {
+//        Map<String, ConsumerRecords<byte[],byte[]>> map = client().poll(2*1000);
+//        
+//        for (Entry<String,ConsumerRecords<byte[],byte[]>> e : map.entrySet()) {
+//            KafkaSubscriber<?> subscriber = subscribers.get(e.getKey());
+//            if (subscriber != null) {
+//                for (ConsumerRecord<byte[],byte[]> rec : e.getValue().records()) {
+//                    trace.info/*trace*/("{} processing record for {}", id(), rec.topicAndPartition());
+//                    subscriber.accept(rec);
+//                }
+//            }
+//            else {
+//                // must be TopicPartition based subscription
+//                for (ConsumerRecord<byte[],byte[]> rec : e.getValue().records()) {
+//                    subscriber = subscribers.get(rec.topicAndPartition());
+//                    trace.info/*trace*/("{} processing record for {}", id(), rec.topicAndPartition());
+//                    subscriber.accept(rec);
+//                }
+//            }
+//        }
+//    }
+    
+    String id() {
+        if (id == null) {
+            // include our short object Id
+            id = "Kafka " + toString().substring(toString().indexOf('@') + 1);
+        }
+        return id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaProducerConnector.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaProducerConnector.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaProducerConnector.java
new file mode 100644
index 0000000..852cf70
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaProducerConnector.java
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.kafka.runtime;
+
+import java.util.Map;
+
+import org.apache.edgent.function.Supplier;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+/**
+ * A connector for producing/publishing Kafka key/value records.
+ */
+public class KafkaProducerConnector extends KafkaConnector implements AutoCloseable {
+    private static final long serialVersionUID = 1L;
+    private final Supplier<Map<String,Object>> configFn;
+    private String id;
+    private KafkaProducer<byte[],byte[]> producer;
+
+    public KafkaProducerConnector(Supplier<Map<String, Object>> configFn) {
+        this.configFn = configFn;
+    }
+    
+    synchronized KafkaProducer<byte[],byte[]> client() {
+        if (producer == null)
+            producer = new KafkaProducer<byte[],byte[]>(configFn.get(),
+                    new ByteArraySerializer(), new ByteArraySerializer());
+        return producer;
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        if (producer != null)
+            producer.close();
+    }
+    
+    String id() {
+        if (id == null) {
+            // include our short object Id
+            id = "Kafka " + toString().substring(toString().indexOf('@') + 1);
+        }
+        return id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaPublisher.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaPublisher.java
new file mode 100644
index 0000000..8d08262
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaPublisher.java
@@ -0,0 +1,85 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.kafka.runtime;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+
+/**
+ * A publisher of Kafka key/value records from a stream of tuples of type {@code T}
+ *
+ * @param <T> tuple type
+ */
+public class KafkaPublisher<T> implements Consumer<T>, AutoCloseable {
+    private static final long serialVersionUID = 1L;
+    private static final Logger trace = KafkaProducerConnector.getTrace();
+    private final Function<T, byte[]> keyFn;
+    private final Function<T, byte[]> valueFn;
+    private final Function<T, String> topicFn;
+    private final Function<T, Integer> partitionFn;
+    private String id;
+    private KafkaProducerConnector connector;
+
+    public KafkaPublisher(KafkaProducerConnector connector, Function<T, byte[]> keyFn,
+            Function<T, byte[]> valueFn, Function<T, String> topicFn,
+            Function<T, Integer> partitionFn) {
+        
+        this.connector = connector;
+        if (keyFn == null)
+            keyFn = tuple -> null;
+        this.keyFn = keyFn;
+        this.valueFn = valueFn;
+        this.topicFn = topicFn;
+        if (partitionFn == null)
+            partitionFn = tuple -> null;
+        this.partitionFn = partitionFn;
+    }
+
+    @Override
+    public void accept(T t) {
+        String topic = topicFn.apply(t);
+        Integer partition = partitionFn.apply(t);
+        byte[] key = keyFn.apply(t);
+        byte[] value = valueFn.apply(t);
+        ProducerRecord<byte[],byte[]> rec = new ProducerRecord<>(
+                topic, partition, key, value);
+
+        trace.trace("{} sending rec to topic:{} partition:{}", id(), topic, partition);
+        
+        // TODO add callback for trace of actual completion?
+        
+        connector.client().send(rec);  // async; doesn't throw
+    }
+
+    @Override
+    public void close() throws Exception {
+        connector.close();
+    }
+    
+    private String id() {
+        if (id == null) {
+            // include our short object Id
+            id = connector.id() + " PUB " + toString().substring(toString().indexOf('@') + 1);
+        }
+        return id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaSubscriber.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaSubscriber.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaSubscriber.java
new file mode 100644
index 0000000..0a652e7
--- /dev/null
+++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaSubscriber.java
@@ -0,0 +1,187 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.kafka.runtime;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.slf4j.Logger;
+
+import org.apache.edgent.connectors.kafka.KafkaConsumer;
+import kafka.message.MessageAndMetadata;
+
+/**
+ * A consumer of Kafka key/value records that generates tuples of type {@code T}
+ *
+ * @param <T> tuple type
+ */
+public class KafkaSubscriber<T> implements Consumer<Consumer<T>>, AutoCloseable {
+    private static final long serialVersionUID = 1L;
+    private static final Logger trace = KafkaConsumerConnector.getTrace();
+    private String id;
+    private final KafkaConsumerConnector connector;
+    private Function<ByteConsumerRecord, T> byteToTupleFn;
+    private Function<StringConsumerRecord, T> stringToTupleFn;
+    private final String[] topics;
+    private Consumer<T> eventSubmitter;
+   
+    @SuppressWarnings("unchecked")
+    public KafkaSubscriber(KafkaConsumerConnector connector, Function<? extends KafkaConsumer.ConsumerRecord<?,?>, T> toTupleFn, boolean isStringFn, String... topics) {
+        this.connector = connector;
+        if (isStringFn)
+            stringToTupleFn = (Function<StringConsumerRecord, T>) toTupleFn;
+        else
+            byteToTupleFn = (Function<ByteConsumerRecord, T>) toTupleFn;
+        this.topics = topics;
+        connector.addSubscriber(this, this.topics);
+    }
+    
+    // The explicit topicPartition style of subscription is part of the
+    // Kafka's new KafkaConsumer API and is unbaked as of 8.2.2
+//    @SuppressWarnings("unchecked")
+//    public KafkaSubscriber(KafkaConsumerConnector connector, Function<? extends KafkaConsumer.ConsumerRecord<?,?>,T> toTupleFn, boolean isStringFn, KafkaConsumer.TopicPartition... topicPartitions) {
+//        this.connector = connector;
+//        if (isStringFn)
+//            stringToTupleFn = (Function<KafkaSubscriber<T>.StringConsumerRecord, T>) toTupleFn;
+//        else
+//            byteToTupleFn = (Function<KafkaSubscriber<T>.ByteConsumerRecord, T>) toTupleFn;
+//        this.topics = null;
+//        TopicPartition[] tps = new TopicPartition[topicPartitions.length];
+//        int i = 0;
+//        for (KafkaConsumer.TopicPartition tp : topicPartitions)
+//            tps[i++] = new TopicPartition(tp.topic(), tp.partition());
+//        this.topicPartitions = tps;
+//        connector.addSubscriber(this, this.topicPartitions);
+//    }
+
+    @Override
+    public void accept(Consumer<T> eventSubmitter) {
+        try {
+            this.eventSubmitter = eventSubmitter;
+            connector.start(this);
+        }
+        catch (Throwable t) {
+            trace.error("{} initialization failure", id(), t);
+        }
+    }
+    
+    // unbaked 8.2.2 KafkaConsumer
+//    void accept(ConsumerRecord<byte[],byte[]> rec) {
+//        if (rec.error()) {
+//            trace.error("{} error retrieving record.", id(), rec.error());
+//            return;
+//        }
+//        try {
+//            T tuple;
+//            if (stringToTupleFn != null)
+//                tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
+//            else
+//                tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
+//            eventSubmitter.accept(tuple);
+//        }
+//        catch (Exception e) {
+//            trace.error("{} failure processing record from {}", id(), rec.topicAndPartition(), e);
+//        }
+//    }
+    
+    void accept(MessageAndMetadata<byte[],byte[]> rec) {
+        try {
+            trace.trace("{} received rec for topic:{} partition:{} offset:{}",
+                        id(), rec.topic(), rec.partition(), rec.offset());
+            T tuple;
+            if (stringToTupleFn != null)
+                tuple = stringToTupleFn.apply(new StringConsumerRecord(rec));
+            else
+                tuple = byteToTupleFn.apply(new ByteConsumerRecord(rec));
+            eventSubmitter.accept(tuple);
+        }
+        catch (Exception e) {
+            String tp = String.format("[%s,%d]", rec.topic(), rec.partition());
+            trace.error("{} failure processing record from {}", id(), tp, e);
+        }
+    }
+    
+    private String id() {
+        if (id == null) {
+            // include our short object Id
+            id = connector.id() + " SUB " + toString().substring(toString().indexOf('@') + 1);
+        }
+        return id;
+    }
+
+    @Override
+    public void close() throws Exception {
+        connector.close(this);
+    }
+    
+    private static abstract class ConsumerRecordBase<K,V>
+                        implements KafkaConsumer.ConsumerRecord<K,V> {
+        protected final MessageAndMetadata<byte[], byte[]> rec;
+        
+        ConsumerRecordBase(MessageAndMetadata<byte[], byte[]> rec) {
+            this.rec = rec;
+        }
+        
+        public abstract K key();
+        public abstract V value();
+
+        @Override
+        public String topic() { return rec.topic(); };
+        @Override
+        public int partition() { return rec.partition(); }
+        @Override
+        public long offset() { return rec.offset(); }
+    }
+    
+    private static class ByteConsumerRecord extends ConsumerRecordBase<byte[],byte[]>
+                                implements KafkaConsumer.ByteConsumerRecord {
+        
+        ByteConsumerRecord(MessageAndMetadata<byte[], byte[]> rec) {
+            super(rec);
+        }
+
+        @Override
+        public byte[] key() { return rec.key(); }
+        @Override
+        public byte[] value() { return rec.message(); }
+    }
+    
+    private static class StringConsumerRecord extends ConsumerRecordBase<String,String>
+                                implements KafkaConsumer.StringConsumerRecord {
+        
+        StringConsumerRecord(MessageAndMetadata<byte[], byte[]> rec) {
+            super(rec);
+        }
+
+        @Override
+        public String key() {
+            byte[] key = rec.key();
+            if (key == null)
+                return null;
+            return new String(key, StandardCharsets.UTF_8);
+        }
+        @Override
+        public String value() {
+            return new String(rec.message(), StandardCharsets.UTF_8);
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java b/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
deleted file mode 100644
index 9edb0a2..0000000
--- a/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.kafka;
-
-/**
- * KafkaStreams connector globalization tests.
- */
-public class KafkaStreamsGlobalTestManual extends KafkaStreamsTestManual {
-    private static final String globalMsg1 = "\u4f60\u597d";
-    private static final String globalMsg2 = "\u4f60\u5728\u55ce";
-
-    public String getMsg1() {
-        return globalMsg1;
-    }
-
-    public String getMsg2() {
-        return globalMsg2;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsTestManual.java b/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
deleted file mode 100644
index 8f92fec..0000000
--- a/connectors/kafka/src/test/java/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.kafka;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-
-import edgent.test.connectors.common.ConnectorTestBase;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.plumbing.PlumbingStreams;
-import edgent.connectors.kafka.KafkaConsumer;
-import edgent.connectors.kafka.KafkaProducer;
-
-public class KafkaStreamsTestManual extends ConnectorTestBase {
-    private static final int PUB_DELAY_MSEC = 4*1000;
-    private static final int SEC_TIMEOUT = 10;
-    private final String BASE_GROUP_ID = "kafkaStreamsTestGroupId";
-    private final String uniq = simpleTS();
-    private final String msg1 = "Hello";
-    private final String msg2 = "Are you there?";
-    
-    public String getMsg1() {
-        return msg1;
-    }
-
-    public String getMsg2() {
-        return msg2;
-    }
-
-    private String[] getKafkaTopics() {
-        String csvTopics = System.getProperty("edgent.test.connectors.kafka.csvTopics", "testTopic1,testTopic2");
-        String[] topics = csvTopics.split(",");
-        return topics;
-    }
-    
-    private String getKafkaBootstrapServers() {
-        return System.getProperty("edgent.test.connectors.kafka.bootstrap.servers", "localhost:9092");
-    }
-    
-    private String getKafkaZookeeperConnect() {
-        return System.getProperty("edgent.test.connectors.kafka.zookeeper.connect", "localhost:2181");
-    }
-    
-    private String newGroupId(String name) {
-        String groupId = BASE_GROUP_ID + "_" + name + "_" + uniq.replaceAll(":", "");
-        System.out.println("["+simpleTS()+"] "
-                + "Using Kafka consumer group.id " + groupId);
-        return groupId;
-    }
-
-    private Map<String,Object> newConsumerConfig(String groupId) {
-        Map<String,Object> config = new HashMap<>();
-        // unbaked 8.8.2 KafkaConsumer
-//        config.put("bootstrap.servers", getKafkaBootstrapServers());
-        config.put("zookeeper.connect", getKafkaZookeeperConnect());
-        config.put("group.id", groupId);
-        return config;
-    }
-    
-    private Map<String,Object> newProducerConfig() {
-        Map<String,Object> config = new HashMap<>();
-        config.put("bootstrap.servers", getKafkaBootstrapServers());
-        return config;
-    }
-    
-    private static class Rec {
-        String topic;
-        int partition;
-        String key;
-        String value;
-        Rec(String topic, int partition, String key, String value) {
-            this.topic = topic;
-            this.key = key;
-            this.value = value;
-        }
-        public String toString() {
-            return "topic:"+topic+" partition:"+partition+" key:"+key+" value:"+value;
-        }
-    }
-
-    @Test
-    public void testSimple() throws Exception {
-        Topology t = newTopology("testSimple");
-        MsgGenerator mgen = new MsgGenerator(t.getName());
-        String topic = getKafkaTopics()[0];
-        String groupId = newGroupId(t.getName());
-        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
-        
-        TStream<String> s = PlumbingStreams.blockingOneShotDelay(
-                        t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-        
-        Map<String,Object> pConfig = newProducerConfig();
-        KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
-        
-        TSink<String> sink = producer.publish(s, topic);
-        
-        Map<String,Object> cConfig = newConsumerConfig(groupId);
-        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-        
-        TStream<String> rcvd = consumer.subscribe(
-                    rec -> rec.value(),
-                    topic);
-
-        completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
-        
-        assertNotNull(sink);
-    }
-
-    @Test
-    public void testWithKey() throws Exception {
-        Topology t = newTopology("testWithKey");
-        MsgGenerator mgen = new MsgGenerator(t.getName());
-        String topic = getKafkaTopics()[0];
-        String groupId = newGroupId(t.getName());
-        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
-        List<Rec> recs = new ArrayList<>();
-        int i = 0;
-        for (String msg : msgs) {
-            recs.add(new Rec(topic, 0, "key-" + ++i, msg));
-        }
-        List<String> expected = new ArrayList<>();
-        for (Rec rec : recs) {
-            expected.add(rec.toString());
-        }
-        
-        // Test publish with key
-        // Also exercise ConsumerRecord accessors
-        
-        TStream<Rec> s = PlumbingStreams.blockingOneShotDelay(
-                t.collection(recs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-        
-        Map<String,Object> pConfig = newProducerConfig();
-        KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
-        
-        producer.publish(s,
-                    tuple -> tuple.key,
-                    tuple -> tuple.value,
-                    tuple -> tuple.topic,
-                    tuple -> tuple.partition);
-        
-        Map<String,Object> cConfig = newConsumerConfig(groupId);
-        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-        
-        TStream<String> rcvd = consumer.subscribe(
-                    rec -> new Rec(rec.topic(),
-                                rec.partition(),
-                                rec.key(),
-                                rec.value()).toString(),
-                    topic);
-
-        completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, expected.toArray(new String[0]));
-    }
-
-    @Test
-    public void testPubSubBytes() throws Exception {
-        Topology t = newTopology("testPubSubBytes");
-        MsgGenerator mgen = new MsgGenerator(t.getName());
-        String topic = getKafkaTopics()[0];
-        String groupId = newGroupId(t.getName());
-        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
-        List<Rec> recs = new ArrayList<>();
-        int i = 0;
-        for (String msg : msgs) {
-            recs.add(new Rec(topic, 0, "key-" + ++i, msg));
-        }
-        List<String> expected = new ArrayList<>();
-        for (Rec rec : recs) {
-            expected.add(rec.toString());
-        }
-        
-        // Test publishBytes() / subscribeBytes()
-        
-        TStream<Rec> s = PlumbingStreams.blockingOneShotDelay(
-                t.collection(recs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-        
-        Map<String,Object> pConfig = newProducerConfig();
-        KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
-        
-        producer.publishBytes(s,
-                    tuple -> tuple.key.getBytes(StandardCharsets.UTF_8),
-                    tuple -> tuple.value.getBytes(StandardCharsets.UTF_8),
-                    tuple -> tuple.topic,
-                    tuple -> tuple.partition);
-        
-        Map<String,Object> cConfig = newConsumerConfig(groupId);
-        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-        
-        TStream<String> rcvd = consumer.subscribeBytes(
-                    rec -> new Rec(rec.topic(),
-                                rec.partition(),
-                                new String(rec.key(), StandardCharsets.UTF_8),
-                                new String(rec.value(), StandardCharsets.UTF_8)).toString(),
-                    topic);
-
-        completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, expected.toArray(new String[0]));
-    }
-
-    @Test
-    public void testMultiPub() throws Exception {
-        Topology t = newTopology("testMultiPub");
-        MsgGenerator mgen = new MsgGenerator(t.getName());
-        String topic1 = getKafkaTopics()[0];
-        String topic2 = getKafkaTopics()[1];
-        String groupId = newGroupId(t.getName());
-        List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
-        List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
-        List<String> msgs = new ArrayList<>(msgs1);
-        msgs.addAll(msgs2);
-        
-        // Multiple publish() on a single connection.
-        // Also multi-topic subscribe().
-        
-        TStream<String> s1 = PlumbingStreams.blockingOneShotDelay(
-                t.collection(msgs1), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-        
-        TStream<String> s2 = PlumbingStreams.blockingOneShotDelay(
-                t.collection(msgs2), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-        
-        Map<String,Object> pConfig = newProducerConfig();
-        KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
-        
-        TSink<String> sink1 = producer.publish(s1, topic1);
-        TSink<String> sink2 = producer.publish(s2, topic2);
-        
-        Map<String,Object> cConfig = newConsumerConfig(groupId);
-        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-        
-        TStream<String> rcvd = consumer.subscribe(
-                    rec -> rec.value(),
-                    topic1, topic2);
-
-        completeAndValidate(false/*ordered*/, "", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
-        
-        assertNotNull(sink1);
-        assertNotNull(sink2);
-        assertNotSame(sink1, sink2);
-    }
-
-    @Test(expected=IllegalStateException.class)
-    public void testMultiSubNeg() throws Exception {
-        Topology t = newTopology("testMultiSubNeg");
-        MsgGenerator mgen = new MsgGenerator(t.getName());
-        String topic1 = getKafkaTopics()[0];
-        String topic2 = getKafkaTopics()[1];
-        String groupId = newGroupId(t.getName());
-        List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
-        List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
-        
-        // Multiple subscribe() on a single connection.
-        // Currently, w/Kafka0.8.2.2, we only support a single
-        // subscriber on the connection and an IllegalStateException
-        // is thrown.
-        // This restriction will be removed when we migrate to Kafka 0.9.0.0
-        
-        TStream<String> s1 = PlumbingStreams.blockingOneShotDelay(
-                t.collection(msgs1), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-        
-        TStream<String> s2 = PlumbingStreams.blockingOneShotDelay(
-                t.collection(msgs2), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
-        
-        Map<String,Object> pConfig = newProducerConfig();
-        KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
-        
-        producer.publish(s1, topic1);
-        producer.publish(s2, topic2);
-        
-        Map<String,Object> cConfig = newConsumerConfig(groupId);
-        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-        
-        @SuppressWarnings("unused")
-        TStream<String> rcvd1 = consumer.subscribe(
-                    rec -> rec.value(),
-                    topic1);
-        
-        @SuppressWarnings("unused")
-        TStream<String> rcvd2 = consumer.subscribe(
-                    rec -> rec.value(),
-                    topic2);
-        
-        // TODO see "single subscribe" restriction above
-        
-//        // TODO union() is NYI
-////        TStream<String> rcvd = rcvd1.union(rcvd2);
-////
-////        completeAndValidate(false/*ordered*/, "", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
-//        
-//        Condition<Long> tc1 = t.getTester().tupleCount(rcvd1, msgs1.size());
-//        Condition<Long> tc2 = t.getTester().tupleCount(rcvd2, msgs2.size());
-//        
-//        List<Condition<Long>> conditions = new ArrayList<>();
-//        conditions.add(tc1);
-//        conditions.add(tc2);
-//        Condition<?> tc = tc1.and(tc2);
-//
-//        Condition<List<String>> contents1 = t.getTester().streamContents(rcvd1, msgs1.toArray(new String[0]));
-//        Condition<List<String>> contents2 = t.getTester().streamContents(rcvd2, msgs2.toArray(new String[0]));
-//
-//        complete(t, tc, SEC_TIMEOUT, TimeUnit.SECONDS);
-//
-//        assertTrue(groupId + " contents1:" + contents1.getResult(), contents1.valid());
-//        assertTrue(groupId + " contents2:" + contents2.getResult(), contents2.valid());
-//        assertTrue("valid:" + tc, tc.valid());
-    }
-
-    @Test(expected=IllegalArgumentException.class)
-    public void testNoTopicSubNeg() throws Exception {
-        Topology t = newTopology("testNoTopicSubNeg");
-        String groupId = newGroupId(t.getName());
-        
-        Map<String,Object> cConfig = newConsumerConfig(groupId);
-        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-        
-        consumer.subscribe(rec -> rec.value()/*, "topic1"*/);
-    }
-
-    @Test(expected=IllegalArgumentException.class)
-    public void testDupTopicSub1Neg() throws Exception {
-        Topology t = newTopology("testDupTopicSub1Neg");
-        String groupId = newGroupId(t.getName());
-        
-        Map<String,Object> cConfig = newConsumerConfig(groupId);
-        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-        
-        consumer.subscribe(rec -> rec.value(), "topic1", "topic1");
-    }
-
-    @Test(expected=IllegalArgumentException.class)
-    public void testDupTopicSub2Neg() throws Exception {
-        Topology t = newTopology("testDupTopicSub2Neg");
-        String groupId = newGroupId(t.getName());
-        
-        Map<String,Object> cConfig = newConsumerConfig(groupId);
-        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
-        
-        consumer.subscribe(rec -> rec.value(), "topic1");
-        consumer.subscribe(rec -> rec.value(), "topic1");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
new file mode 100644
index 0000000..3c45b68
--- /dev/null
+++ b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
@@ -0,0 +1,36 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.kafka;
+
+/**
+ * KafkaStreams connector globalization tests.
+ */
+public class KafkaStreamsGlobalTestManual extends KafkaStreamsTestManual {
+    private static final String globalMsg1 = "\u4f60\u597d";
+    private static final String globalMsg2 = "\u4f60\u5728\u55ce";
+
+    public String getMsg1() {
+        return globalMsg1;
+    }
+
+    public String getMsg2() {
+        return globalMsg2;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
new file mode 100644
index 0000000..4328af8
--- /dev/null
+++ b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
@@ -0,0 +1,365 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.kafka;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.connectors.kafka.KafkaConsumer;
+import org.apache.edgent.connectors.kafka.KafkaProducer;
+import org.apache.edgent.test.connectors.common.ConnectorTestBase;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+import org.junit.Test;
+
+public class KafkaStreamsTestManual extends ConnectorTestBase {
+    private static final int PUB_DELAY_MSEC = 4*1000;
+    private static final int SEC_TIMEOUT = 10;
+    private final String BASE_GROUP_ID = "kafkaStreamsTestGroupId";
+    private final String uniq = simpleTS();
+    private final String msg1 = "Hello";
+    private final String msg2 = "Are you there?";
+    
+    public String getMsg1() {
+        return msg1;
+    }
+
+    public String getMsg2() {
+        return msg2;
+    }
+
+    private String[] getKafkaTopics() {
+        String csvTopics = System.getProperty("org.apache.edgent.test.connectors.kafka.csvTopics", "testTopic1,testTopic2");
+        String[] topics = csvTopics.split(",");
+        return topics;
+    }
+    
+    private String getKafkaBootstrapServers() {
+        return System.getProperty("org.apache.edgent.test.connectors.kafka.bootstrap.servers", "localhost:9092");
+    }
+    
+    private String getKafkaZookeeperConnect() {
+        return System.getProperty("org.apache.edgent.test.connectors.kafka.zookeeper.connect", "localhost:2181");
+    }
+    
+    private String newGroupId(String name) {
+        String groupId = BASE_GROUP_ID + "_" + name + "_" + uniq.replaceAll(":", "");
+        System.out.println("["+simpleTS()+"] "
+                + "Using Kafka consumer group.id " + groupId);
+        return groupId;
+    }
+
+    private Map<String,Object> newConsumerConfig(String groupId) {
+        Map<String,Object> config = new HashMap<>();
+        // unbaked 8.8.2 KafkaConsumer
+//        config.put("bootstrap.servers", getKafkaBootstrapServers());
+        config.put("zookeeper.connect", getKafkaZookeeperConnect());
+        config.put("group.id", groupId);
+        return config;
+    }
+    
+    private Map<String,Object> newProducerConfig() {
+        Map<String,Object> config = new HashMap<>();
+        config.put("bootstrap.servers", getKafkaBootstrapServers());
+        return config;
+    }
+    
+    private static class Rec {
+        String topic;
+        int partition;
+        String key;
+        String value;
+        Rec(String topic, int partition, String key, String value) {
+            this.topic = topic;
+            this.key = key;
+            this.value = value;
+        }
+        public String toString() {
+            return "topic:"+topic+" partition:"+partition+" key:"+key+" value:"+value;
+        }
+    }
+
+    @Test
+    public void testSimple() throws Exception {
+        Topology t = newTopology("testSimple");
+        MsgGenerator mgen = new MsgGenerator(t.getName());
+        String topic = getKafkaTopics()[0];
+        String groupId = newGroupId(t.getName());
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
+        
+        TStream<String> s = PlumbingStreams.blockingOneShotDelay(
+                        t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+        
+        Map<String,Object> pConfig = newProducerConfig();
+        KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
+        
+        TSink<String> sink = producer.publish(s, topic);
+        
+        Map<String,Object> cConfig = newConsumerConfig(groupId);
+        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+        
+        TStream<String> rcvd = consumer.subscribe(
+                    rec -> rec.value(),
+                    topic);
+
+        completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
+        
+        assertNotNull(sink);
+    }
+
+    @Test
+    public void testWithKey() throws Exception {
+        Topology t = newTopology("testWithKey");
+        MsgGenerator mgen = new MsgGenerator(t.getName());
+        String topic = getKafkaTopics()[0];
+        String groupId = newGroupId(t.getName());
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
+        List<Rec> recs = new ArrayList<>();
+        int i = 0;
+        for (String msg : msgs) {
+            recs.add(new Rec(topic, 0, "key-" + ++i, msg));
+        }
+        List<String> expected = new ArrayList<>();
+        for (Rec rec : recs) {
+            expected.add(rec.toString());
+        }
+        
+        // Test publish with key
+        // Also exercise ConsumerRecord accessors
+        
+        TStream<Rec> s = PlumbingStreams.blockingOneShotDelay(
+                t.collection(recs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+        
+        Map<String,Object> pConfig = newProducerConfig();
+        KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
+        
+        producer.publish(s,
+                    tuple -> tuple.key,
+                    tuple -> tuple.value,
+                    tuple -> tuple.topic,
+                    tuple -> tuple.partition);
+        
+        Map<String,Object> cConfig = newConsumerConfig(groupId);
+        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+        
+        TStream<String> rcvd = consumer.subscribe(
+                    rec -> new Rec(rec.topic(),
+                                rec.partition(),
+                                rec.key(),
+                                rec.value()).toString(),
+                    topic);
+
+        completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, expected.toArray(new String[0]));
+    }
+
+    @Test
+    public void testPubSubBytes() throws Exception {
+        Topology t = newTopology("testPubSubBytes");
+        MsgGenerator mgen = new MsgGenerator(t.getName());
+        String topic = getKafkaTopics()[0];
+        String groupId = newGroupId(t.getName());
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
+        List<Rec> recs = new ArrayList<>();
+        int i = 0;
+        for (String msg : msgs) {
+            recs.add(new Rec(topic, 0, "key-" + ++i, msg));
+        }
+        List<String> expected = new ArrayList<>();
+        for (Rec rec : recs) {
+            expected.add(rec.toString());
+        }
+        
+        // Test publishBytes() / subscribeBytes()
+        
+        TStream<Rec> s = PlumbingStreams.blockingOneShotDelay(
+                t.collection(recs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+        
+        Map<String,Object> pConfig = newProducerConfig();
+        KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
+        
+        producer.publishBytes(s,
+                    tuple -> tuple.key.getBytes(StandardCharsets.UTF_8),
+                    tuple -> tuple.value.getBytes(StandardCharsets.UTF_8),
+                    tuple -> tuple.topic,
+                    tuple -> tuple.partition);
+        
+        Map<String,Object> cConfig = newConsumerConfig(groupId);
+        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+        
+        TStream<String> rcvd = consumer.subscribeBytes(
+                    rec -> new Rec(rec.topic(),
+                                rec.partition(),
+                                new String(rec.key(), StandardCharsets.UTF_8),
+                                new String(rec.value(), StandardCharsets.UTF_8)).toString(),
+                    topic);
+
+        completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, expected.toArray(new String[0]));
+    }
+
+    @Test
+    public void testMultiPub() throws Exception {
+        Topology t = newTopology("testMultiPub");
+        MsgGenerator mgen = new MsgGenerator(t.getName());
+        String topic1 = getKafkaTopics()[0];
+        String topic2 = getKafkaTopics()[1];
+        String groupId = newGroupId(t.getName());
+        List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
+        List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
+        List<String> msgs = new ArrayList<>(msgs1);
+        msgs.addAll(msgs2);
+        
+        // Multiple publish() on a single connection.
+        // Also multi-topic subscribe().
+        
+        TStream<String> s1 = PlumbingStreams.blockingOneShotDelay(
+                t.collection(msgs1), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+        
+        TStream<String> s2 = PlumbingStreams.blockingOneShotDelay(
+                t.collection(msgs2), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+        
+        Map<String,Object> pConfig = newProducerConfig();
+        KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
+        
+        TSink<String> sink1 = producer.publish(s1, topic1);
+        TSink<String> sink2 = producer.publish(s2, topic2);
+        
+        Map<String,Object> cConfig = newConsumerConfig(groupId);
+        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+        
+        TStream<String> rcvd = consumer.subscribe(
+                    rec -> rec.value(),
+                    topic1, topic2);
+
+        completeAndValidate(false/*ordered*/, "", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
+        
+        assertNotNull(sink1);
+        assertNotNull(sink2);
+        assertNotSame(sink1, sink2);
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testMultiSubNeg() throws Exception {
+        Topology t = newTopology("testMultiSubNeg");
+        MsgGenerator mgen = new MsgGenerator(t.getName());
+        String topic1 = getKafkaTopics()[0];
+        String topic2 = getKafkaTopics()[1];
+        String groupId = newGroupId(t.getName());
+        List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
+        List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
+        
+        // Multiple subscribe() on a single connection.
+        // Currently, w/Kafka0.8.2.2, we only support a single
+        // subscriber on the connection and an IllegalStateException
+        // is thrown.
+        // This restriction will be removed when we migrate to Kafka 0.9.0.0
+        
+        TStream<String> s1 = PlumbingStreams.blockingOneShotDelay(
+                t.collection(msgs1), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+        
+        TStream<String> s2 = PlumbingStreams.blockingOneShotDelay(
+                t.collection(msgs2), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+        
+        Map<String,Object> pConfig = newProducerConfig();
+        KafkaProducer producer = new KafkaProducer(t, () -> pConfig);
+        
+        producer.publish(s1, topic1);
+        producer.publish(s2, topic2);
+        
+        Map<String,Object> cConfig = newConsumerConfig(groupId);
+        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+        
+        @SuppressWarnings("unused")
+        TStream<String> rcvd1 = consumer.subscribe(
+                    rec -> rec.value(),
+                    topic1);
+        
+        @SuppressWarnings("unused")
+        TStream<String> rcvd2 = consumer.subscribe(
+                    rec -> rec.value(),
+                    topic2);
+        
+        // TODO see "single subscribe" restriction above
+        
+//        // TODO union() is NYI
+////        TStream<String> rcvd = rcvd1.union(rcvd2);
+////
+////        completeAndValidate(false/*ordered*/, "", t, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
+//        
+//        Condition<Long> tc1 = t.getTester().tupleCount(rcvd1, msgs1.size());
+//        Condition<Long> tc2 = t.getTester().tupleCount(rcvd2, msgs2.size());
+//        
+//        List<Condition<Long>> conditions = new ArrayList<>();
+//        conditions.add(tc1);
+//        conditions.add(tc2);
+//        Condition<?> tc = tc1.and(tc2);
+//
+//        Condition<List<String>> contents1 = t.getTester().streamContents(rcvd1, msgs1.toArray(new String[0]));
+//        Condition<List<String>> contents2 = t.getTester().streamContents(rcvd2, msgs2.toArray(new String[0]));
+//
+//        complete(t, tc, SEC_TIMEOUT, TimeUnit.SECONDS);
+//
+//        assertTrue(groupId + " contents1:" + contents1.getResult(), contents1.valid());
+//        assertTrue(groupId + " contents2:" + contents2.getResult(), contents2.valid());
+//        assertTrue("valid:" + tc, tc.valid());
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void testNoTopicSubNeg() throws Exception {
+        Topology t = newTopology("testNoTopicSubNeg");
+        String groupId = newGroupId(t.getName());
+        
+        Map<String,Object> cConfig = newConsumerConfig(groupId);
+        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+        
+        consumer.subscribe(rec -> rec.value()/*, "topic1"*/);
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void testDupTopicSub1Neg() throws Exception {
+        Topology t = newTopology("testDupTopicSub1Neg");
+        String groupId = newGroupId(t.getName());
+        
+        Map<String,Object> cConfig = newConsumerConfig(groupId);
+        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+        
+        consumer.subscribe(rec -> rec.value(), "topic1", "topic1");
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void testDupTopicSub2Neg() throws Exception {
+        Topology t = newTopology("testDupTopicSub2Neg");
+        String groupId = newGroupId(t.getName());
+        
+        Map<String,Object> cConfig = newConsumerConfig(groupId);
+        KafkaConsumer consumer = new KafkaConsumer(t, () -> cConfig);
+        
+        consumer.subscribe(rec -> rec.value(), "topic1");
+        consumer.subscribe(rec -> rec.value(), "topic1");
+    }
+
+}