You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dj...@apache.org on 2016/03/16 14:34:50 UTC

[3/6] incubator-quarks git commit: Added split(enumClass, splitter)

Added split(enumClass, splitter)


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/36529a68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/36529a68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/36529a68

Branch: refs/heads/master
Commit: 36529a685d3584059de3f78239af70d1740938a7
Parents: 97113d8
Author: cazen.lee <ca...@samsung.com>
Authored: Tue Mar 15 19:51:51 2016 +0900
Committer: cazen.lee <ca...@samsung.com>
Committed: Tue Mar 15 19:51:51 2016 +0900

----------------------------------------------------------------------
 .../src/main/java/quarks/topology/TStream.java  | 25 ++++--
 .../java/quarks/test/topology/TStreamTest.java  | 80 ++++++++++++++------
 .../samples/topology/SplitWithEnumSample.java   | 67 ++++++++++++++++
 .../topology/spi/graph/ConnectorStream.java     | 48 ++++++++----
 4 files changed, 177 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/36529a68/api/topology/src/main/java/quarks/topology/TStream.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/TStream.java b/api/topology/src/main/java/quarks/topology/TStream.java
index 709341e..355504c 100644
--- a/api/topology/src/main/java/quarks/topology/TStream.java
+++ b/api/topology/src/main/java/quarks/topology/TStream.java
@@ -4,18 +4,15 @@
 */
 package quarks.topology;
 
+import quarks.function.*;
+import quarks.oplet.core.Pipe;
+import quarks.oplet.core.Sink;
+
+import java.util.EnumMap;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import quarks.function.Consumer;
-import quarks.function.Function;
-import quarks.function.Predicate;
-import quarks.function.ToIntFunction;
-import quarks.function.UnaryOperator;
-import quarks.oplet.core.Pipe;
-import quarks.oplet.core.Sink;
-
 /**
  * A {@code TStream} is a declaration of a continuous sequence of tuples. A
  * connected topology of streams and functional transformations is built using
@@ -197,6 +194,18 @@ public interface TStream<T> extends TopologyElement {
     List<TStream<T>> split(int n, ToIntFunction<T> splitter);
 
     /**
+     * Split a stream's tuples among {@code enumClass.size} streams as specified by
+     * {@code splitter}.
+     *
+     * @param enumClass
+     *            enum data to split
+     * @param splitter
+     *            the splitter function
+     * @return EnumMap<E,TStream<T>>
+     */
+    <E extends Enum<E>> EnumMap<E,TStream<T>> split(Class<E> enumClass, Function<T, E> splitter);
+
+    /**
      * Declare a stream that contains the same contents as this stream while
      * peeking at each element using {@code peeker}. <BR>
      * For each tuple {@code t} on this stream, {@code peeker.accept(t)} will be

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/36529a68/api/topology/src/test/java/quarks/test/topology/TStreamTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/TStreamTest.java b/api/topology/src/test/java/quarks/test/topology/TStreamTest.java
index 8745ed9..5f45525 100644
--- a/api/topology/src/test/java/quarks/test/topology/TStreamTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/TStreamTest.java
@@ -4,35 +4,20 @@
 */
 package quarks.test.topology;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.junit.Ignore;
 import org.junit.Test;
-
 import quarks.topology.TSink;
 import quarks.topology.TStream;
 import quarks.topology.Topology;
 import quarks.topology.tester.Condition;
 
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.*;
+
 @Ignore
 public abstract class TStreamTest extends TopologyAbstractTest {
 
@@ -285,6 +270,57 @@ public abstract class TStreamTest extends TopologyAbstractTest {
         newTopology().strings("a1").split(-28, tuple -> 0);
     }
 
+    /**
+     * Test enum data structure
+     */
+    private enum LogSeverityEnum {
+        EMERG(0), ALERT(1), CRITICAL(2), ERROR(3), WARNING(4), NOTICE(5), INFO(6), DEBUG(7);
+
+        private final int code;
+
+        LogSeverityEnum(final int code) {
+            this.code = code;
+        }
+    }
+
+    /**
+     * Test split(enum) with integer type enum.
+     */
+    @Test
+    public void testSplitWithEnum() throws Exception {
+
+        Topology t = newTopology();
+
+        TStream<String> s = t.strings("Log1_ALERT", "Log2_INFO", "Log3_INFO", "Log4_INFO", "Log5_ERROR", "Log6_ERROR", "Log7_CRITICAL");
+        TStream<String> i = s.map(String::toString);
+        EnumMap<LogSeverityEnum,TStream<String>> splits = i.split(LogSeverityEnum.class, e -> LogSeverityEnum.valueOf(e.split("_")[1]));
+
+        assertStream(t, i);
+
+        Condition<Long> tc0 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ALERT), 1);
+        Condition<Long> tc1 = t.getTester().tupleCount(splits.get(LogSeverityEnum.INFO), 3);
+        Condition<Long> tc2 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ERROR), 2);
+        Condition<Long> tc3 = t.getTester().tupleCount(splits.get(LogSeverityEnum.CRITICAL), 1);
+        Condition<Long> tc4 = t.getTester().tupleCount(splits.get(LogSeverityEnum.WARNING), 0);
+
+        Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(LogSeverityEnum.ALERT), "Log1_ALERT");
+        Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(LogSeverityEnum.INFO), "Log2_INFO",
+            "Log3_INFO", "Log4_INFO");
+        Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(LogSeverityEnum.ERROR), "Log5_ERROR",
+            "Log6_ERROR");
+        Condition<List<String>> contents3 = t.getTester().streamContents(splits.get(LogSeverityEnum.CRITICAL), "Log7_CRITICAL");
+        Condition<List<String>> contents4 = t.getTester().streamContents(splits.get(LogSeverityEnum.WARNING));
+
+        complete(t, t.getTester().and(tc0, tc1, tc2, tc3, tc4));
+
+
+        assertTrue(contents0.toString(), contents0.valid());
+        assertTrue(contents1.toString(), contents1.valid());
+        assertTrue(contents2.toString(), contents2.valid());
+        assertTrue(contents3.toString(), contents3.valid());
+        assertTrue(contents4.toString(), contents4.valid());
+    }
+
     @Test
     public void testFanout2() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/36529a68/samples/topology/src/main/java/quarks/samples/topology/SplitWithEnumSample.java
----------------------------------------------------------------------
diff --git a/samples/topology/src/main/java/quarks/samples/topology/SplitWithEnumSample.java b/samples/topology/src/main/java/quarks/samples/topology/SplitWithEnumSample.java
new file mode 100644
index 0000000..2545ecf
--- /dev/null
+++ b/samples/topology/src/main/java/quarks/samples/topology/SplitWithEnumSample.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package quarks.samples.topology;
+
+import quarks.console.server.HttpServer;
+import quarks.providers.development.DevelopmentProvider;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+import java.util.EnumMap;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class SplitWithEnumSample {
+
+    public enum LogSeverityEnum {
+
+        ALERT(1), CRITICAL(2), ERROR(3), WARNING(4), NOTICE(5), INFO(6), DEBUG(7);
+
+        private final int code;
+
+        LogSeverityEnum(final int code) {
+            this.code = code;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        DevelopmentProvider dtp = new DevelopmentProvider();
+
+        Topology t = dtp.newTopology("SplitWithEnumSample");
+
+        Random r = new Random();
+
+        LogSeverityEnum[] values = LogSeverityEnum.values();
+        TStream<String> d = t.poll(() -> values[r.nextInt(values.length)].toString()+ "_Log", 500, TimeUnit.MILLISECONDS);
+
+        EnumMap<LogSeverityEnum, TStream<String>> categories = d
+            .split(LogSeverityEnum.class, e -> LogSeverityEnum.valueOf(e.split("_")[0]));
+
+        TStream<String> warnStream = categories.get(LogSeverityEnum.WARNING);
+        TStream<String> errorStream = categories.get(LogSeverityEnum.ERROR);
+        TStream<String> infoStream = categories.get(LogSeverityEnum.INFO);
+
+        warnStream.sink(data -> System.out.println("warnStream = " + data));
+        errorStream.sink(data -> System.out.println("errorStream = " + data));
+        infoStream.sink(data -> System.out.println("infoStream = " + data));
+
+        dtp.submit(t);
+
+        System.out.println(dtp.getServices().getService(HttpServer.class).getConsoleUrl());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/36529a68/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
----------------------------------------------------------------------
diff --git a/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java b/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
index 798f1fb..5dc2902 100644
--- a/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
+++ b/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
@@ -4,19 +4,7 @@
 */
 package quarks.topology.spi.graph;
 
-import static quarks.function.Functions.synchronizedFunction;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import quarks.function.Consumer;
-import quarks.function.Function;
-import quarks.function.Functions;
-import quarks.function.Predicate;
-import quarks.function.ToIntFunction;
+import quarks.function.*;
 import quarks.graph.Connector;
 import quarks.graph.Graph;
 import quarks.graph.Vertex;
@@ -34,6 +22,11 @@ import quarks.topology.TWindow;
 import quarks.topology.Topology;
 import quarks.topology.spi.AbstractTStream;
 
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static quarks.function.Functions.synchronizedFunction;
+
 /**
  * A stream that directly adds oplets to the graph.
  *
@@ -96,6 +89,35 @@ public class ConnectorStream<G extends Topology, T> extends AbstractTStream<G, T
     }
 
     @Override
+    public <E extends Enum<E>> EnumMap<E,TStream<T>> split(Class<E> enumClass, Function<T, E> splitter) {
+
+        E[] es = enumClass.getEnumConstants();
+
+/*
+        List<TStream<T>> outputs = split(es.length, e -> IntStream.range(0, es.length).filter(i-> es[i].equals(splitter.apply(e)))
+            .mapToObj(i -> es[i].ordinal()).findAny().orElse(-1));
+*/
+
+        List<TStream<T>> outputs = split(es.length, new ToIntFunction<T>() {
+            @Override
+            public int applyAsInt(T input) {
+                for(int i = 0; i < es.length; i++){
+                    if(es[i].equals(splitter.apply(input))){
+                        return es[i].ordinal();
+                    }
+                }
+                return -1;
+            }});
+
+        EnumMap<E,TStream<T>> returnMap = new EnumMap<>(enumClass);
+        for (int i = 0; i < es.length ; i++){
+            returnMap.put(es[i], outputs.get(es[i].ordinal()));
+        }
+
+        return returnMap;
+    }
+
+    @Override
     public TStream<T> peek(Consumer<T> peeker) {
         peeker = Functions.synchronizedConsumer(peeker);
         connector.peek(new Peek<T>(peeker));