You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/02/23 19:27:48 UTC

[02/13] samza git commit: SAMZA-1073: moving all operator classes into samza-core

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
deleted file mode 100644
index c4e9f51..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-
-public class TestMessageStreamImplUtil {
-  public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) {
-    return new MessageStreamImpl<M>(graph);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java b/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
deleted file mode 100644
index 9a425d1..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * Example input {@link MessageEnvelope} w/ Json message and string as the key.
- */
-
-public class JsonIncomingSystemMessageEnvelope<T> implements MessageEnvelope<String, T> {
-
-  private final String key;
-  private final T data;
-  private final Offset offset;
-  private final SystemStreamPartition partition;
-
-  public JsonIncomingSystemMessageEnvelope(String key, T data, Offset offset, SystemStreamPartition partition) {
-    this.key = key;
-    this.data = data;
-    this.offset = offset;
-    this.partition = partition;
-  }
-
-  @Override
-  public T getMessage() {
-    return this.data;
-  }
-
-  @Override
-  public String getKey() {
-    return this.key;
-  }
-
-  public Offset getOffset() {
-    return this.offset;
-  }
-
-  public SystemStreamPartition getSystemStreamPartition() {
-    return this.partition;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
deleted file mode 100644
index 361972e..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.hamcrest.core.IsEqual;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-
-public class TestOperatorImpl {
-
-  TestMessageEnvelope curInputMsg;
-  MessageCollector curCollector;
-  TaskCoordinator curCoordinator;
-
-  @Test
-  public void testSubscribers() {
-    this.curInputMsg = null;
-    this.curCollector = null;
-    this.curCoordinator = null;
-    OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>() {
-      @Override
-      public void onNext(TestMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
-        TestOperatorImpl.this.curInputMsg = message;
-        TestOperatorImpl.this.curCollector = collector;
-        TestOperatorImpl.this.curCoordinator = coordinator;
-      }
-    };
-    // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext()
-    OperatorImpl mockSub = mock(OperatorImpl.class);
-    opImpl.registerNextOperator(mockSub);
-    TestOutputMessageEnvelope xOutput = mock(TestOutputMessageEnvelope.class);
-    MessageCollector mockCollector = mock(MessageCollector.class);
-    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
-    opImpl.propagateResult(xOutput, mockCollector, mockCoordinator);
-    verify(mockSub, times(1)).onNext(
-        argThat(new IsEqual<>(xOutput)),
-        argThat(new IsEqual<>(mockCollector)),
-        argThat(new IsEqual<>(mockCoordinator))
-    );
-    // verify onNext() is invoked correctly
-    TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
-    opImpl.onNext(mockInput, mockCollector, mockCoordinator);
-    assertEquals(mockInput, this.curInputMsg);
-    assertEquals(mockCollector, this.curCollector);
-    assertEquals(mockCoordinator, this.curCoordinator);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
deleted file mode 100644
index 02637a3..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.task.TaskContext;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperatorImpls {
-  Field nextOperatorsField = null;
-  Method createOpMethod = null;
-  Method createOpsMethod = null;
-
-  @Before
-  public void prep() throws NoSuchFieldException, NoSuchMethodException {
-    nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
-    nextOperatorsField.setAccessible(true);
-
-    createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
-        OperatorSpec.class, Config.class, TaskContext.class);
-    createOpMethod.setAccessible(true);
-
-    createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
-    createOpsMethod.setAccessible(true);
-  }
-
-  @Test
-  public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException {
-    // get window operator
-    WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
-    WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null);
-    when(mockWnd.getWindow()).thenReturn(windowInternal);
-    MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class);
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-
-    OperatorGraph opGraph = new OperatorGraph();
-    OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>)
-        createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext);
-    assertTrue(opImpl instanceof WindowOperatorImpl);
-    Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
-    wndInternalField.setAccessible(true);
-    WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl);
-    assertEquals(wndInternal, windowInternal);
-
-    // get simple operator
-    StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
-    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
-    when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
-    assertTrue(opImpl instanceof StreamOperatorImpl);
-    Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
-    txfmFnField.setAccessible(true);
-    assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
-
-    // get sink operator
-    SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
-    SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
-    when(sinkOp.getSinkFn()).thenReturn(sinkFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
-    assertTrue(opImpl instanceof SinkOperatorImpl);
-    Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
-    sinkFnField.setAccessible(true);
-    assertEquals(sinkFn, sinkFnField.get(opImpl));
-
-    // get join operator
-    PartialJoinOperatorSpec<TestMessageEnvelope, String, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
-    TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class);
-    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class);
-    when(joinOp.getTransformFn()).thenReturn(joinFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
-    assertTrue(opImpl instanceof PartialJoinOperatorImpl);
-  }
-
-  @Test
-  public void testEmptyChain() throws InvocationTargetException, IllegalAccessException {
-    // test creation of empty chain
-    MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    Config mockConfig = mock(Config.class);
-    OperatorGraph opGraph = new OperatorGraph();
-    RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
-    assertTrue(operatorChain != null);
-  }
-
-  @Test
-  public void testLinearChain() throws IllegalAccessException, InvocationTargetException {
-    // test creation of linear chain
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
-    TaskContext mockContext = mock(TaskContext.class);
-    Config mockConfig = mock(Config.class);
-    testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
-    OperatorGraph opGraph = new OperatorGraph();
-    RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
-    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
-    assertEquals(subsSet.size(), 1);
-    OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next();
-    Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(firstOpImpl);
-    assertEquals(subsOps.size(), 1);
-    OperatorImpl wndOpImpl = subsOps.iterator().next();
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(wndOpImpl);
-    assertEquals(subsOps.size(), 0);
-  }
-
-  @Test
-  public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException {
-    // test creation of broadcast chain
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
-    TaskContext mockContext = mock(TaskContext.class);
-    Config mockConfig = mock(Config.class);
-    testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
-    testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
-    OperatorGraph opGraph = new OperatorGraph();
-    RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
-    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
-    assertEquals(subsSet.size(), 2);
-    Iterator<OperatorImpl> iter = subsSet.iterator();
-    // check the first branch w/ flatMap
-    OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> opImpl = iter.next();
-    Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
-    assertEquals(subsOps.size(), 1);
-    OperatorImpl flatMapImpl = subsOps.iterator().next();
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(flatMapImpl);
-    assertEquals(subsOps.size(), 0);
-    // check the second branch w/ map
-    opImpl = iter.next();
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
-    assertEquals(subsOps.size(), 1);
-    OperatorImpl mapImpl = subsOps.iterator().next();
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(mapImpl);
-    assertEquals(subsOps.size(), 0);
-  }
-
-  @Test
-  public void testJoinChain() throws IllegalAccessException, InvocationTargetException {
-    // test creation of join chain
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
-    MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
-    TaskContext mockContext = mock(TaskContext.class);
-    Config mockConfig = mock(Config.class);
-    input1
-        .join(input2,
-            new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
-              @Override
-              public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
-                return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
-              }
-
-              @Override
-              public String getFirstKey(TestMessageEnvelope message) {
-                return message.getKey();
-              }
-
-              @Override
-              public String getSecondKey(TestMessageEnvelope message) {
-                return message.getKey();
-              }
-            })
-        .map(m -> m);
-    OperatorGraph opGraph = new OperatorGraph();
-    // now, we create chained operators from each input sources
-    RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
-    RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);
-    // check that those two chains will merge at map operator
-    // first branch of the join
-    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1);
-    assertEquals(subsSet.size(), 1);
-    OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp1 = subsSet.iterator().next();
-    Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp1);
-    assertEquals(subsOps.size(), 1);
-    // the map operator consumes the common join output, where two branches merge
-    OperatorImpl mapImpl = subsOps.iterator().next();
-    // second branch of the join
-    subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain2);
-    assertEquals(subsSet.size(), 1);
-    OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp2 = subsSet.iterator().next();
-    assertNotSame(joinOp1, joinOp2);
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp2);
-    assertEquals(subsOps.size(), 1);
-    // make sure that the map operator is the same
-    assertEquals(mapImpl, subsOps.iterator().next());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
deleted file mode 100644
index ce9fdd2..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-
-public class TestSinkOperatorImpl {
-
-  @Test
-  public void testSinkOperator() {
-    SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
-    SinkFunction<TestOutputMessageEnvelope> sinkFn = mock(SinkFunction.class);
-    when(sinkOp.getSinkFn()).thenReturn(sinkFn);
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp, mockConfig, mockContext);
-    TestOutputMessageEnvelope mockMsg = mock(TestOutputMessageEnvelope.class);
-    MessageCollector mockCollector = mock(MessageCollector.class);
-    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
-
-    sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator);
-    verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
deleted file mode 100644
index 010a210..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-
-public class TestStreamOperatorImpl {
-
-  @Test
-  public void testSimpleOperator() {
-    StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
-    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
-    when(mockOp.getTransformFn()).thenReturn(txfmFn);
-    MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class);
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext));
-    TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
-    TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class);
-    Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { {
-        this.add(outMsg);
-      } };
-    when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
-    MessageCollector mockCollector = mock(MessageCollector.class);
-    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
-    opImpl.onNext(inMsg, mockCollector, mockCoordinator);
-    verify(txfmFn, times(1)).apply(inMsg);
-    verify(opImpl, times(1)).propagateResult(outMsg, mockCollector, mockCoordinator);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
deleted file mode 100644
index 31257a4..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-
-public class TestOperatorSpecs {
-  @Test
-  public void testGetStreamOperator() {
-    FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
-          this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
-        } };
-    MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput);
-    assertEquals(strmOp.getTransformFn(), transformFn);
-    assertEquals(strmOp.getNextStream(), mockOutput);
-  }
-
-  @Test
-  public void testGetSinkOperator() {
-    SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
-          TaskCoordinator taskCoordinator) -> { };
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph);
-    assertEquals(sinkOp.getSinkFn(), sinkFn);
-    assertTrue(sinkOp.getNextStream() == null);
-  }
-
-  @Test
-  public void testGetWindowOperator() throws Exception {
-    Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey";
-    BiFunction<TestMessageEnvelope, Integer, Integer> aggregator = (m, c) -> c + 1;
-
-    //instantiate a window using reflection
-    WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null);
-
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
-    WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut);
-    assertEquals(spec.getWindow(), window);
-    assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
-    assertEquals(spec.getWindow().getFoldFunction(), aggregator);
-  }
-
-  @Test
-  public void testGetPartialJoinOperator() {
-    PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger =
-      new PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope>() {
-        @Override
-        public TestMessageEnvelope apply(MessageEnvelope<Object, ?> m1, MessageEnvelope<Object, ?> m2) {
-          return new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime());
-        }
-
-        @Override
-        public Object getKey(MessageEnvelope<Object, ?> message) {
-          return message.getKey();
-        }
-
-        @Override
-        public Object getOtherKey(MessageEnvelope<Object, ?> message) {
-          return message.getKey();
-        }
-      };
-
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> joinOutput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
-    PartialJoinOperatorSpec<MessageEnvelope<Object, ?>, Object, MessageEnvelope<Object, ?>, TestMessageEnvelope> partialJoin =
-        OperatorSpecs.createPartialJoinOperatorSpec(merger, mockGraph, joinOutput);
-
-    assertEquals(partialJoin.getNextStream(), joinOutput);
-    MessageEnvelope<Object, Object> m = mock(MessageEnvelope.class);
-    MessageEnvelope<Object, Object> s = mock(MessageEnvelope.class);
-    assertEquals(partialJoin.getTransformFn(), merger);
-  }
-
-  @Test
-  public void testGetMergeOperator() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
-    StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output);
-    Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { {
-        this.add(t);
-      } };
-    TestMessageEnvelope t = mock(TestMessageEnvelope.class);
-    assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
-    assertEquals(mergeOp.getNextStream(), output);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 813882c..5de30d8 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -20,7 +20,6 @@ include \
   'samza-api',
   'samza-elasticsearch',
   'samza-log4j',
-  'samza-operator',
   'samza-rest',
   'samza-shell'