You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/08/16 19:43:21 UTC

[1/2] apex-core git commit: APEXCORE-448 made operator name available in the operator context

Repository: apex-core
Updated Branches:
  refs/heads/master a42c67bc2 -> 3119aba89


APEXCORE-448 made operator name available in the operator context


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/9bcde2e3
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/9bcde2e3
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/9bcde2e3

Branch: refs/heads/master
Commit: 9bcde2e306ce7f6c3199112ad5bb737370f6624e
Parents: 130ce6b
Author: Chandni Singh <cs...@apache.org>
Authored: Fri Aug 5 21:19:22 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Fri Aug 5 21:19:22 2016 -0700

----------------------------------------------------------------------
 .../main/java/com/datatorrent/api/Context.java  |  5 ++
 .../stram/api/OperatorDeployInfo.java           |  6 ++
 .../stram/engine/OperatorContext.java           | 15 +++-
 .../stram/engine/StreamingContainer.java        |  5 +-
 .../stram/engine/AtMostOnceTest.java            |  3 +-
 .../stram/engine/GenericNodeTest.java           | 14 ++--
 .../datatorrent/stram/engine/InputNodeTest.java |  2 +-
 .../com/datatorrent/stram/engine/NodeTest.java  | 13 ++--
 .../stram/engine/OperatorContextTest.java       | 79 ++++++++++++++++++++
 .../stram/engine/ProcessingModeTests.java       |  2 +-
 .../stram/stream/InlineStreamTest.java          |  6 +-
 11 files changed, 130 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index a0f3ad3..187bf08 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -323,6 +323,11 @@ public interface Context
     int getId();
 
     /**
+     * @return the logical operator name which was used to add the operator in tha DAG.
+     */
+    String getName();
+
+    /**
      * Return the number of windows before the next checkpoint including the current window.
      * @return Number of windows from checkpoint, 1 if the checkpoint will be after the current window
      */

http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
index bccbedf..219017b 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
@@ -52,6 +52,12 @@ public class OperatorDeployInfo implements Serializable, OperatorContext
   }
 
   @Override
+  public String getName()
+  {
+    return name;
+  }
+
+  @Override
   public AttributeMap getAttributes()
   {
     return contextAttributes;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
index 424ffcc..7113280 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java
@@ -21,9 +21,13 @@ package com.datatorrent.stram.engine;
 import java.util.Collection;
 import java.util.concurrent.BlockingQueue;
 
+import javax.validation.constraints.NotNull;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 import com.datatorrent.api.Attribute.AttributeMap;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.StatsListener.OperatorRequest;
@@ -44,6 +48,7 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont
   private Thread thread;
   private long lastProcessedWindowId;
   private final int id;
+  private final String name;
   // the size of the circular queue should be configurable. hardcoded to 1024 for now.
   private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<ContainerStats.OperatorStats>(1024);
   private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<OperatorRequest>(1024);
@@ -81,14 +86,16 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont
   /**
    *
    * @param id the value of id
+   * @param name name of the operator
    * @param attributes the value of attributes
    * @param parentContext
    */
-  public OperatorContext(int id, AttributeMap attributes, Context parentContext)
+  public OperatorContext(int id, @NotNull String name, AttributeMap attributes, Context parentContext)
   {
     super(attributes, parentContext);
     this.lastProcessedWindowId = Stateless.WINDOW_ID;
     this.id = id;
+    this.name = Preconditions.checkNotNull(name, "operator name");
     this.stateless = super.getValue(OperatorContext.STATELESS);
   }
 
@@ -99,6 +106,12 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont
   }
 
   @Override
+  public String getName()
+  {
+    return name;
+  }
+
+  @Override
   public int getWindowsFromCheckpoint()
   {
     return windowsFromCheckpoint;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 54b8a6e..27688e3 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -902,14 +902,15 @@ public class StreamingContainer extends YarnContainerMain
 
       Context parentContext;
       if (ndi instanceof UnifierDeployInfo) {
-        OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
+        OperatorContext unifiedOperatorContext = new OperatorContext(0, ndi.name,
+            ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
         parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext);
         massageUnifierDeployInfo(ndi);
       } else {
         parentContext = containerContext;
       }
 
-      OperatorContext ctx = new OperatorContext(ndi.id, ndi.contextAttributes, parentContext);
+      OperatorContext ctx = new OperatorContext(ndi.id, ndi.name, ndi.contextAttributes, parentContext);
       ctx.attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, ndi.checkpoint.windowId);
       logger.debug("Restoring operator {} to checkpoint {} stateless={}.", ndi.id, Codec.getStringWindowId(ndi.checkpoint.windowId), ctx.stateless);
       Node<?> node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
index b1296c8..cc777f7 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java
@@ -83,7 +83,8 @@ public class AtMostOnceTest extends ProcessingModeTests
     map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);
     map.put(OperatorContext.PROCESSING_MODE, processingMode);
 
-    final GenericNode node = new GenericNode(new MultiInputOperator(), new com.datatorrent.stram.engine.OperatorContext(1, map, null));
+    final GenericNode node = new GenericNode(new MultiInputOperator(),
+        new com.datatorrent.stram.engine.OperatorContext(1, "operator", map, null));
     AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input1", 1024);
     AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("input1", 1024);
     node.connectInputPort("input1", reservoir1);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index b7f6362..5dfa5f3 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -145,9 +145,9 @@ public class GenericNodeTest
 
     public volatile List<Checkpoint> checkpoints = Lists.newArrayList();
 
-    public TestStatsOperatorContext(int id, AttributeMap attributes, Context parentContext)
+    public TestStatsOperatorContext(int id, String name, AttributeMap attributes, Context parentContext)
     {
-      super(id, attributes, parentContext);
+      super(id, name, attributes, parentContext);
     }
 
     @Override
@@ -276,7 +276,8 @@ public class GenericNodeTest
     long sleeptime = 25L;
     final ArrayList<Object> list = new ArrayList<Object>();
     GenericOperator go = new GenericOperator();
-    final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), null));
+    final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+        new DefaultAttributeMap(), null));
     gn.setId(1);
     AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024);
     AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024);
@@ -397,7 +398,8 @@ public class GenericNodeTest
     long maxSleep = 5000;
     long sleeptime = 25L;
     GenericOperator go = new GenericOperator();
-    final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), null));
+    final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+        new DefaultAttributeMap(), null));
     gn.setId(1);
     AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024);
     AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024);
@@ -537,7 +539,7 @@ public class GenericNodeTest
 
     dam.put(OperatorContext.STORAGE_AGENT, storageAgent);
 
-    TestStatsOperatorContext operatorContext = new TestStatsOperatorContext(0, dam, null);
+    TestStatsOperatorContext operatorContext = new TestStatsOperatorContext(0, "operator", dam, null);
     final GenericNode gn = new GenericNode(go, operatorContext);
     gn.setId(1);
 
@@ -635,7 +637,7 @@ public class GenericNodeTest
     DefaultAttributeMap attrMap = new DefaultAttributeMap();
     attrMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, dagCheckPoint);
     attrMap.put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, opCheckPoint);
-    final OperatorContext context = new com.datatorrent.stram.engine.OperatorContext(0, attrMap, null);
+    final OperatorContext context = new com.datatorrent.stram.engine.OperatorContext(0, "operator", attrMap, null);
     final GenericNode gn = new GenericNode(go, context);
     gn.setId(1);
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
index 1c51abb..e182b75 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
@@ -69,7 +69,7 @@ public class InputNodeTest
     dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 10);
     dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 10);
 
-    final InputNode in = new InputNode(tio, new com.datatorrent.stram.engine.OperatorContext(0, dam, null));
+    final InputNode in = new InputNode(tio, new com.datatorrent.stram.engine.OperatorContext(0, "operator", dam, null));
     in.setId(1);
 
     TestSink testSink = new TestSink();

http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
index 48a819f..26bd7a0 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
@@ -206,7 +206,7 @@ public class NodeTest
     attributeMap.put(OperatorContext.STORAGE_AGENT, new StorageAgentImpl());
     attributeMap.put(OperatorContext.STATELESS, true);
     Node<StatelessOperator> node = new Node<StatelessOperator>(new StatelessOperator(),
-        new com.datatorrent.stram.engine.OperatorContext(0, attributeMap, null))
+        new com.datatorrent.stram.engine.OperatorContext(0,"operator", attributeMap, null))
     {
       @Override
       public void connectInputPort(String port, SweepableReservoir reservoir)
@@ -239,18 +239,18 @@ public class NodeTest
     DefaultAttributeMap attributeMap = new DefaultAttributeMap();
     attributeMap.put(OperatorContext.STORAGE_AGENT, new StorageAgentImpl());
     Node<TestGenericOperator> node = new Node<TestGenericOperator>(new TestGenericOperator(),
-        new com.datatorrent.stram.engine.OperatorContext(0, attributeMap, null))
+        new com.datatorrent.stram.engine.OperatorContext(0, "operator", attributeMap, null))
     {
       @Override
       public void connectInputPort(String port, SweepableReservoir reservoir)
       {
-        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        throw new UnsupportedOperationException("Not supported yet.");
       }
 
       @Override
       public void run()
       {
-        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        throw new UnsupportedOperationException("Not supported yet.");
       }
 
     };
@@ -292,9 +292,10 @@ public class NodeTest
     final Node in;
 
     if (trueGenericFalseInput) {
-      in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, dam, null));
+      in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, "operator", dam, null));
     } else {
-      in = new InputNode((InputCheckpointOperator)gco, new com.datatorrent.stram.engine.OperatorContext(0, dam, null));
+      in = new InputNode((InputCheckpointOperator)gco, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+          dam, null));
     }
 
     in.setId(1);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java b/engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java
new file mode 100644
index 0000000..aa48bbe
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.engine;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Tests for {@link OperatorContext}
+ */
+public class OperatorContextTest
+{
+  private static CountDownLatch latch = new CountDownLatch(1);
+  private static volatile String operatorName;
+
+  private static class MockInputOperator extends BaseOperator implements InputOperator
+  {
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      operatorName = Preconditions.checkNotNull(context.getName(), "operator name");
+      latch.countDown();
+    }
+
+    @Override
+    public void emitTuples()
+    {
+    }
+  }
+
+  @Test
+  public void testInjectionOfOperatorName() throws Exception
+  {
+    StreamingApplication application = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+        dag.addOperator("input", new MockInputOperator());
+      }
+    };
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(application, new Configuration());
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+    latch.await();
+    Assert.assertEquals("operator name", "input", operatorName);
+    lc.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
index 55d8d5f..3e208c8 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java
@@ -261,7 +261,7 @@ public class ProcessingModeTests
     map.put(OperatorContext.PROCESSING_MODE, processingMode);
 
     final GenericNode node = new GenericNode(new MultiInputOperator(),
-        new com.datatorrent.stram.engine.OperatorContext(1, map, null));
+        new com.datatorrent.stram.engine.OperatorContext(1, "operator", map, null));
     AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input1", 1024);
     AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("input1", 1024);
     node.connectInputPort("input1", reservoir1);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/9bcde2e3/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
index ef63a62..ed145c7 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java
@@ -56,12 +56,14 @@ public class InlineStreamTest
     final int totalTupleCount = 5000;
 
     final PassThroughNode<Object> operator1 = new PassThroughNode<Object>();
-    final GenericNode node1 = new GenericNode(operator1, new OperatorContext(1, new DefaultAttributeMap(), null));
+    final GenericNode node1 = new GenericNode(operator1, new OperatorContext(1, "operator1", new DefaultAttributeMap(),
+        null));
     node1.setId(1);
     operator1.setup(node1.context);
 
     final PassThroughNode<Object> operator2 = new PassThroughNode<Object>();
-    final GenericNode node2 = new GenericNode(operator2, new OperatorContext(2, new DefaultAttributeMap(), null));
+    final GenericNode node2 = new GenericNode(operator2, new OperatorContext(2, "operator2", new DefaultAttributeMap(),
+        null));
     node2.setId(2);
     operator2.setup(node2.context);
 


[2/2] apex-core git commit: Merge branch 'APEXCORE-448' of https://github.com/chandnisingh/incubator-apex-core into APEXCORE-448

Posted by vr...@apache.org.
Merge branch 'APEXCORE-448' of https://github.com/chandnisingh/incubator-apex-core into APEXCORE-448


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/3119aba8
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/3119aba8
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/3119aba8

Branch: refs/heads/master
Commit: 3119aba891676d936c76cd36acf5fb6833547a13
Parents: a42c67b 9bcde2e
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Tue Aug 16 12:42:02 2016 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Aug 16 12:42:02 2016 -0700

----------------------------------------------------------------------
 .../main/java/com/datatorrent/api/Context.java  |  5 ++
 .../stram/api/OperatorDeployInfo.java           |  6 ++
 .../stram/engine/OperatorContext.java           | 15 +++-
 .../stram/engine/StreamingContainer.java        |  5 +-
 .../stram/engine/AtMostOnceTest.java            |  3 +-
 .../stram/engine/GenericNodeTest.java           | 14 ++--
 .../datatorrent/stram/engine/InputNodeTest.java |  2 +-
 .../com/datatorrent/stram/engine/NodeTest.java  | 13 ++--
 .../stram/engine/OperatorContextTest.java       | 79 ++++++++++++++++++++
 .../stram/engine/ProcessingModeTests.java       |  2 +-
 .../stram/stream/InlineStreamTest.java          |  6 +-
 11 files changed, 130 insertions(+), 20 deletions(-)
----------------------------------------------------------------------