You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dj...@apache.org on 2016/03/16 14:34:50 UTC
[3/6] incubator-quarks git commit: Added split(enumClass, splitter)
Added split(enumClass, splitter)
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/36529a68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/36529a68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/36529a68
Branch: refs/heads/master
Commit: 36529a685d3584059de3f78239af70d1740938a7
Parents: 97113d8
Author: cazen.lee <ca...@samsung.com>
Authored: Tue Mar 15 19:51:51 2016 +0900
Committer: cazen.lee <ca...@samsung.com>
Committed: Tue Mar 15 19:51:51 2016 +0900
----------------------------------------------------------------------
.../src/main/java/quarks/topology/TStream.java | 25 ++++--
.../java/quarks/test/topology/TStreamTest.java | 80 ++++++++++++++------
.../samples/topology/SplitWithEnumSample.java | 67 ++++++++++++++++
.../topology/spi/graph/ConnectorStream.java | 48 ++++++++----
4 files changed, 177 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/36529a68/api/topology/src/main/java/quarks/topology/TStream.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/TStream.java b/api/topology/src/main/java/quarks/topology/TStream.java
index 709341e..355504c 100644
--- a/api/topology/src/main/java/quarks/topology/TStream.java
+++ b/api/topology/src/main/java/quarks/topology/TStream.java
@@ -4,18 +4,15 @@
*/
package quarks.topology;
+import quarks.function.*;
+import quarks.oplet.core.Pipe;
+import quarks.oplet.core.Sink;
+
+import java.util.EnumMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import quarks.function.Consumer;
-import quarks.function.Function;
-import quarks.function.Predicate;
-import quarks.function.ToIntFunction;
-import quarks.function.UnaryOperator;
-import quarks.oplet.core.Pipe;
-import quarks.oplet.core.Sink;
-
/**
* A {@code TStream} is a declaration of a continuous sequence of tuples. A
* connected topology of streams and functional transformations is built using
@@ -197,6 +194,18 @@ public interface TStream<T> extends TopologyElement {
List<TStream<T>> split(int n, ToIntFunction<T> splitter);
/**
+ * Split a stream's tuples among {@code enumClass.size} streams as specified by
+ * {@code splitter}.
+ *
+ * @param enumClass
+ * enum data to split
+ * @param splitter
+ * the splitter function
+ * @return EnumMap<E,TStream<T>>
+ */
+ <E extends Enum<E>> EnumMap<E,TStream<T>> split(Class<E> enumClass, Function<T, E> splitter);
+
+ /**
* Declare a stream that contains the same contents as this stream while
* peeking at each element using {@code peeker}. <BR>
* For each tuple {@code t} on this stream, {@code peeker.accept(t)} will be
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/36529a68/api/topology/src/test/java/quarks/test/topology/TStreamTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/TStreamTest.java b/api/topology/src/test/java/quarks/test/topology/TStreamTest.java
index 8745ed9..5f45525 100644
--- a/api/topology/src/test/java/quarks/test/topology/TStreamTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/TStreamTest.java
@@ -4,35 +4,20 @@
*/
package quarks.test.topology;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.junit.Ignore;
import org.junit.Test;
-
import quarks.topology.TSink;
import quarks.topology.TStream;
import quarks.topology.Topology;
import quarks.topology.tester.Condition;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.*;
+
@Ignore
public abstract class TStreamTest extends TopologyAbstractTest {
@@ -285,6 +270,57 @@ public abstract class TStreamTest extends TopologyAbstractTest {
newTopology().strings("a1").split(-28, tuple -> 0);
}
+ /**
+ * Test enum data structure
+ */
+ private enum LogSeverityEnum {
+ EMERG(0), ALERT(1), CRITICAL(2), ERROR(3), WARNING(4), NOTICE(5), INFO(6), DEBUG(7);
+
+ private final int code;
+
+ LogSeverityEnum(final int code) {
+ this.code = code;
+ }
+ }
+
+ /**
+ * Test split(enum) with integer type enum.
+ */
+ @Test
+ public void testSplitWithEnum() throws Exception {
+
+ Topology t = newTopology();
+
+ TStream<String> s = t.strings("Log1_ALERT", "Log2_INFO", "Log3_INFO", "Log4_INFO", "Log5_ERROR", "Log6_ERROR", "Log7_CRITICAL");
+ TStream<String> i = s.map(String::toString);
+ EnumMap<LogSeverityEnum,TStream<String>> splits = i.split(LogSeverityEnum.class, e -> LogSeverityEnum.valueOf(e.split("_")[1]));
+
+ assertStream(t, i);
+
+ Condition<Long> tc0 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ALERT), 1);
+ Condition<Long> tc1 = t.getTester().tupleCount(splits.get(LogSeverityEnum.INFO), 3);
+ Condition<Long> tc2 = t.getTester().tupleCount(splits.get(LogSeverityEnum.ERROR), 2);
+ Condition<Long> tc3 = t.getTester().tupleCount(splits.get(LogSeverityEnum.CRITICAL), 1);
+ Condition<Long> tc4 = t.getTester().tupleCount(splits.get(LogSeverityEnum.WARNING), 0);
+
+ Condition<List<String>> contents0 = t.getTester().streamContents(splits.get(LogSeverityEnum.ALERT), "Log1_ALERT");
+ Condition<List<String>> contents1 = t.getTester().streamContents(splits.get(LogSeverityEnum.INFO), "Log2_INFO",
+ "Log3_INFO", "Log4_INFO");
+ Condition<List<String>> contents2 = t.getTester().streamContents(splits.get(LogSeverityEnum.ERROR), "Log5_ERROR",
+ "Log6_ERROR");
+ Condition<List<String>> contents3 = t.getTester().streamContents(splits.get(LogSeverityEnum.CRITICAL), "Log7_CRITICAL");
+ Condition<List<String>> contents4 = t.getTester().streamContents(splits.get(LogSeverityEnum.WARNING));
+
+ complete(t, t.getTester().and(tc0, tc1, tc2, tc3, tc4));
+
+
+ assertTrue(contents0.toString(), contents0.valid());
+ assertTrue(contents1.toString(), contents1.valid());
+ assertTrue(contents2.toString(), contents2.valid());
+ assertTrue(contents3.toString(), contents3.valid());
+ assertTrue(contents4.toString(), contents4.valid());
+ }
+
@Test
public void testFanout2() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/36529a68/samples/topology/src/main/java/quarks/samples/topology/SplitWithEnumSample.java
----------------------------------------------------------------------
diff --git a/samples/topology/src/main/java/quarks/samples/topology/SplitWithEnumSample.java b/samples/topology/src/main/java/quarks/samples/topology/SplitWithEnumSample.java
new file mode 100644
index 0000000..2545ecf
--- /dev/null
+++ b/samples/topology/src/main/java/quarks/samples/topology/SplitWithEnumSample.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package quarks.samples.topology;
+
+import quarks.console.server.HttpServer;
+import quarks.providers.development.DevelopmentProvider;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+import java.util.EnumMap;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class SplitWithEnumSample {
+
+ public enum LogSeverityEnum {
+
+ ALERT(1), CRITICAL(2), ERROR(3), WARNING(4), NOTICE(5), INFO(6), DEBUG(7);
+
+ private final int code;
+
+ LogSeverityEnum(final int code) {
+ this.code = code;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ DevelopmentProvider dtp = new DevelopmentProvider();
+
+ Topology t = dtp.newTopology("SplitWithEnumSample");
+
+ Random r = new Random();
+
+ LogSeverityEnum[] values = LogSeverityEnum.values();
+ TStream<String> d = t.poll(() -> values[r.nextInt(values.length)].toString()+ "_Log", 500, TimeUnit.MILLISECONDS);
+
+ EnumMap<LogSeverityEnum, TStream<String>> categories = d
+ .split(LogSeverityEnum.class, e -> LogSeverityEnum.valueOf(e.split("_")[0]));
+
+ TStream<String> warnStream = categories.get(LogSeverityEnum.WARNING);
+ TStream<String> errorStream = categories.get(LogSeverityEnum.ERROR);
+ TStream<String> infoStream = categories.get(LogSeverityEnum.INFO);
+
+ warnStream.sink(data -> System.out.println("warnStream = " + data));
+ errorStream.sink(data -> System.out.println("errorStream = " + data));
+ infoStream.sink(data -> System.out.println("infoStream = " + data));
+
+ dtp.submit(t);
+
+ System.out.println(dtp.getServices().getService(HttpServer.class).getConsoleUrl());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/36529a68/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
----------------------------------------------------------------------
diff --git a/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java b/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
index 798f1fb..5dc2902 100644
--- a/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
+++ b/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java
@@ -4,19 +4,7 @@
*/
package quarks.topology.spi.graph;
-import static quarks.function.Functions.synchronizedFunction;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import quarks.function.Consumer;
-import quarks.function.Function;
-import quarks.function.Functions;
-import quarks.function.Predicate;
-import quarks.function.ToIntFunction;
+import quarks.function.*;
import quarks.graph.Connector;
import quarks.graph.Graph;
import quarks.graph.Vertex;
@@ -34,6 +22,11 @@ import quarks.topology.TWindow;
import quarks.topology.Topology;
import quarks.topology.spi.AbstractTStream;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static quarks.function.Functions.synchronizedFunction;
+
/**
* A stream that directly adds oplets to the graph.
*
@@ -96,6 +89,35 @@ public class ConnectorStream<G extends Topology, T> extends AbstractTStream<G, T
}
@Override
+ public <E extends Enum<E>> EnumMap<E,TStream<T>> split(Class<E> enumClass, Function<T, E> splitter) {
+
+ E[] es = enumClass.getEnumConstants();
+
+/*
+ List<TStream<T>> outputs = split(es.length, e -> IntStream.range(0, es.length).filter(i-> es[i].equals(splitter.apply(e)))
+ .mapToObj(i -> es[i].ordinal()).findAny().orElse(-1));
+*/
+
+ List<TStream<T>> outputs = split(es.length, new ToIntFunction<T>() {
+ @Override
+ public int applyAsInt(T input) {
+ for(int i = 0; i < es.length; i++){
+ if(es[i].equals(splitter.apply(input))){
+ return es[i].ordinal();
+ }
+ }
+ return -1;
+ }});
+
+ EnumMap<E,TStream<T>> returnMap = new EnumMap<>(enumClass);
+ for (int i = 0; i < es.length ; i++){
+ returnMap.put(es[i], outputs.get(es[i].ordinal()));
+ }
+
+ return returnMap;
+ }
+
+ @Override
public TStream<T> peek(Consumer<T> peeker) {
peeker = Functions.synchronizedConsumer(peeker);
connector.peek(new Peek<T>(peeker));