You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/23 18:01:16 UTC

[3/4] incubator-streams git commit: merge - tests still aren't passing

merge - tests still aren't passing


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c503011c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c503011c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c503011c

Branch: refs/heads/STREAMS-49
Commit: c503011c4465a04f01e321d5193968d1e96e0b12
Parents: 17374fb
Author: sblackmon <sb...@apache.org>
Authored: Sun Nov 23 10:57:20 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Sun Nov 23 10:57:20 2014 -0600

----------------------------------------------------------------------
 streams-contrib/streams-persist-kafka/pom.xml   | 24 ++++++++
 .../streams/kafka/KafkaPersistReaderTask.java   | 62 ++++++++++++++++++++
 .../streams/kafka/KafkaPersistWriter.java       | 11 +++-
 .../streams/kafka/StreamsPartitioner.java       |  4 +-
 .../streams/kafka/test/TestKafkaPersist.java    | 14 +++++
 5 files changed, 110 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c503011c/streams-contrib/streams-persist-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/pom.xml b/streams-contrib/streams-persist-kafka/pom.xml
index 6d98c36..f56885c 100644
--- a/streams-contrib/streams-persist-kafka/pom.xml
+++ b/streams-contrib/streams-persist-kafka/pom.xml
@@ -109,6 +109,30 @@
             <groupId>org.powermock</groupId>
             <artifactId>powermock-api-mockito</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-runtime-local</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <sourceDirectory>src/main/java</sourceDirectory>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c503011c/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
new file mode 100644
index 0000000..03fa291
--- /dev/null
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.kafka;
+
+import com.google.common.base.Preconditions;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.message.MessageAndMetadata;
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class KafkaPersistReaderTask implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReaderTask.class);
+
+    private KafkaPersistReader reader;
+    private KafkaStream<String,String> stream;
+
+    public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,String> stream) {
+        this.reader = reader;
+        this.stream = stream;
+    }
+
+
+
+    @Override
+    public void run() {
+
+        Preconditions.checkNotNull(this.stream);
+
+        ConsumerIterator<String, String> it = stream.iterator();
+        while (it.hasNext()) {
+            MessageAndMetadata<String,String> item = it.next();
+            reader.persistQueue.add(new StreamsDatum(item.message()));
+        }
+        try {
+            Thread.sleep(new Random().nextInt(100));
+        } catch (InterruptedException e) {}
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c503011c/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index 7e300a8..e832029 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -60,12 +60,18 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
     public KafkaPersistWriter(KafkaConfiguration config) {
         this.config = config;
     }
+    
+    public Queue<StreamsDatum> getPersistQueue() {
+        return persistQueue;
+    }
 
     @Override
     public void write(StreamsDatum entry) {
 
+        Preconditions.checkArgument(entry.getDocument() instanceof String);
+        
         String key = entry.getId() != null ? entry.getId() : GuidUtils.generateGuid("kafkawriter");
-
+        
         Preconditions.checkArgument(Strings.isNullOrEmpty(key) == false);
         Preconditions.checkArgument(entry.getDocument() instanceof String);
         Preconditions.checkArgument(Strings.isNullOrEmpty((String)entry.getDocument()) == false);
@@ -92,8 +98,9 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
 
         producer = new Producer(config);
 
+    @Override
+    public void prepare(Object configurationObject) {
         this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c503011c/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
index b1f2a28..af0a8b9 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
@@ -24,15 +24,13 @@ package org.apache.streams.kafka;
 import kafka.producer.Partitioner;
 import kafka.utils.VerifiableProperties;
 
-public class StreamsPartitioner implements Partitioner {
-
+public class StreamsPartitioner implements Partitioner<> {
     public StreamsPartitioner (VerifiableProperties props) {
 
     }
 
     public int partition(Object key, int a_numPartitions) {
         int partition = 0;
-
         return partition;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c503011c/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
index 572b87e..7fac8ff 100644
--- a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
+++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java
@@ -93,6 +93,11 @@ public class TestKafkaPersist {
             Assert.fail();
         }
 
+<<<<<<< Updated upstream
+=======
+        testPersistReader.startStream();
+
+>>>>>>> Stashed changes
         try {
             Thread.sleep(1000);
         } catch (InterruptedException ie) {
@@ -105,6 +110,14 @@ public class TestKafkaPersist {
 
         assert(testResult.size() == 1);
 
+        StreamsDatum datum = testResult.iterator().next();
+
+        assert(datum.getDocument() instanceof String);
+
+        String datumString = (String) datum.getDocument();
+
+        assert(datumString.contains("dummy") && datumString.contains("true"));
+
     }
 
     @After
@@ -116,5 +129,6 @@ public class TestKafkaPersist {
         } finally {
             testKafkaCluster = null;
         }
+        
     }
 }