You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2015/09/24 04:37:56 UTC

[37/50] [abbrv] incubator-apex-core git commit: APEX-28 #resolve

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
deleted file mode 100644
index 78173d8..0000000
--- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
+++ /dev/null
@@ -1,990 +0,0 @@
-/**
- * Copyright (C) 2015 DataTorrent, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.stram.plan;
-
-import com.datatorrent.common.util.BaseOperator;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Serializable;
-
-import java.util.*;
-
-import javax.validation.*;
-import javax.validation.constraints.AssertTrue;
-import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
-import javax.validation.constraints.Pattern;
-
-import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.collect.Maps;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.datatorrent.common.partitioner.StatelessPartitioner;
-import com.datatorrent.api.*;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.netlet.util.Slice;
-import com.datatorrent.stram.engine.GenericTestOperator;
-import com.datatorrent.stram.engine.TestGeneratorInputOperator;
-import com.datatorrent.stram.engine.TestNonOptionalOutportInputOperator;
-import com.datatorrent.stram.engine.TestOutputOperator;
-import com.datatorrent.stram.plan.logical.LogicalPlan;
-import com.datatorrent.stram.plan.logical.LogicalPlan;
-import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
-import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
-import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
-import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
-
-public class LogicalPlanTest {
-
-  @Test
-  public void testCycleDetection() {
-     LogicalPlan dag = new LogicalPlan();
-
-     //NodeConf operator1 = b.getOrAddNode("operator1");
-     GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
-     GenericTestOperator operator3 = dag.addOperator("operator3", GenericTestOperator.class);
-     GenericTestOperator operator4 = dag.addOperator("operator4", GenericTestOperator.class);
-     //NodeConf operator5 = b.getOrAddNode("operator5");
-     //NodeConf operator6 = b.getOrAddNode("operator6");
-     GenericTestOperator operator7 = dag.addOperator("operator7", GenericTestOperator.class);
-
-     // strongly connect n2-n3-n4-n2
-     dag.addStream("n2n3", operator2.outport1, operator3.inport1);
-
-     dag.addStream("n3n4", operator3.outport1, operator4.inport1);
-
-     dag.addStream("n4n2", operator4.outport1, operator2.inport1);
-
-     // self referencing operator cycle
-     StreamMeta n7n7 = dag.addStream("n7n7", operator7.outport1, operator7.inport1);
-     try {
-       n7n7.addSink(operator7.inport1);
-       fail("cannot add to stream again");
-     } catch (Exception e) {
-       // expected, stream can have single input/output only
-     }
-
-     List<List<String>> cycles = new ArrayList<List<String>>();
-     dag.findStronglyConnected(dag.getMeta(operator7), cycles);
-     assertEquals("operator self reference", 1, cycles.size());
-     assertEquals("operator self reference", 1, cycles.get(0).size());
-     assertEquals("operator self reference", dag.getMeta(operator7).getName(), cycles.get(0).get(0));
-
-     // 3 operator cycle
-     cycles.clear();
-     dag.findStronglyConnected(dag.getMeta(operator4), cycles);
-     assertEquals("3 operator cycle", 1, cycles.size());
-     assertEquals("3 operator cycle", 3, cycles.get(0).size());
-     assertTrue("operator2", cycles.get(0).contains(dag.getMeta(operator2).getName()));
-     assertTrue("operator3", cycles.get(0).contains(dag.getMeta(operator3).getName()));
-     assertTrue("operator4", cycles.get(0).contains(dag.getMeta(operator4).getName()));
-
-     try {
-       dag.validate();
-       fail("validation should fail");
-     } catch (ValidationException e) {
-       // expected
-     }
-
-  }
-
-  public static class ValidationOperator extends BaseOperator {
-    public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<Object>();
-
-    public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<Object>();
-  }
-
-  public static class CounterOperator extends BaseOperator {
-    final public transient InputPort<Object> countInputPort = new DefaultInputPort<Object>() {
-      @Override
-      final public void process(Object payload) {
-      }
-    };
-  }
-
-  @Test
-  public void testLogicalPlanSerialization() throws Exception {
-
-    LogicalPlan dag = new LogicalPlan();
-    dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
-
-    ValidationOperator validationNode = dag.addOperator("validationNode", ValidationOperator.class);
-    CounterOperator countGoodNode = dag.addOperator("countGoodNode", CounterOperator.class);
-    CounterOperator countBadNode = dag.addOperator("countBadNode", CounterOperator.class);
-    //ConsoleOutputOperator echoBadNode = dag.addOperator("echoBadNode", ConsoleOutputOperator.class);
-
-    // good tuples to counter operator
-    dag.addStream("goodTuplesStream", validationNode.goodOutputPort, countGoodNode.countInputPort);
-
-    // bad tuples to separate stream and echo operator
-    // (stream with 2 outputs)
-    dag.addStream("badTuplesStream", validationNode.badOutputPort, countBadNode.countInputPort);
-
-    Assert.assertEquals("number root operators", 1, dag.getRootOperators().size());
-    Assert.assertEquals("root operator id", "validationNode", dag.getRootOperators().get(0).getName());
-
-    dag.getContextAttributes(countGoodNode).put(OperatorContext.SPIN_MILLIS, 10);
-
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    LogicalPlan.write(dag, bos);
-
-    // System.out.println("serialized size: " + bos.toByteArray().length);
-
-    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-    LogicalPlan dagClone = LogicalPlan.read(bis);
-    Assert.assertNotNull(dagClone);
-    Assert.assertEquals("number operators in clone", dag.getAllOperators().size(), dagClone.getAllOperators().size());
-    Assert.assertEquals("number root operators in clone", 1, dagClone.getRootOperators().size());
-    Assert.assertTrue("root operator in operators", dagClone.getAllOperators().contains(dagClone.getRootOperators().get(0)));
-
-
-    Operator countGoodNodeClone = dagClone.getOperatorMeta("countGoodNode").getOperator();
-    Assert.assertEquals("", new Integer(10), dagClone.getContextAttributes(countGoodNodeClone).get(OperatorContext.SPIN_MILLIS));
-
-  }
-
-  @Test
-  public void testDeleteOperator()
-  {
-    LogicalPlan dag = new LogicalPlan();
-    TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
-    GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
-    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
-    dag.addStream("s0", input.outport, o1.inport1);
-    StreamMeta s1 = dag.addStream("s1", o1.outport1, o2.inport1);
-    dag.validate();
-    Assert.assertEquals("", 3, dag.getAllOperators().size());
-
-    dag.removeOperator(o2);
-    s1.remove();
-    dag.validate();
-    Assert.assertEquals("", 2, dag.getAllOperators().size());
-  }
-
-  public static class ValidationTestOperator extends BaseOperator implements InputOperator {
-    @NotNull
-    @Pattern(regexp=".*malhar.*", message="Value has to contain 'malhar'!")
-    private String stringField1;
-
-    @Min(2)
-    private int intField1;
-
-    @AssertTrue(message="stringField1 should end with intField1")
-    private boolean isValidConfiguration() {
-      return stringField1.endsWith(String.valueOf(intField1));
-    }
-
-    private String getterProperty2 = "";
-
-    @NotNull
-    public String getProperty2() {
-      return getterProperty2;
-    }
-
-    public void setProperty2(String s) {
-      // annotations need to be on the getter
-      getterProperty2 = s;
-    }
-
-    private String[] stringArrayField;
-
-    public String[] getStringArrayField() {
-      return stringArrayField;
-    }
-
-    public void setStringArrayField(String[] stringArrayField) {
-      this.stringArrayField = stringArrayField;
-    }
-
-    public class Nested {
-      @NotNull
-      private String property = "";
-
-      public String getProperty() {
-        return property;
-      }
-
-      public void setProperty(String property) {
-        this.property = property;
-      }
-
-    }
-
-    @Valid
-    private final Nested nestedBean = new Nested();
-
-    private String stringProperty2;
-
-    public String getStringProperty2() {
-      return stringProperty2;
-    }
-
-    public void setStringProperty2(String stringProperty2) {
-      this.stringProperty2 = stringProperty2;
-    }
-
-    private Map<String, String> mapProperty = Maps.newHashMap();
-
-    public Map<String, String> getMapProperty()
-    {
-      return mapProperty;
-    }
-
-    public void setMapProperty(Map<String, String> mapProperty)
-    {
-      this.mapProperty = mapProperty;
-    }
-
-    @Override
-    public void emitTuples() {
-      // Emit no tuples
-
-    }
-
-  }
-
-  @Test
-  public void testOperatorValidation() {
-
-    ValidationTestOperator bean = new ValidationTestOperator();
-    bean.stringField1 = "malhar1";
-    bean.intField1 = 1;
-
-    // ensure validation standalone produces expected results
-    ValidatorFactory factory =
-        Validation.buildDefaultValidatorFactory();
-    Validator validator = factory.getValidator();
-    Set<ConstraintViolation<ValidationTestOperator>> constraintViolations =
-             validator.validate(bean);
-    //for (ConstraintViolation<ValidationTestOperator> cv : constraintViolations) {
-    //  System.out.println("validation error: " + cv);
-    //}
-    Assert.assertEquals("" + constraintViolations,1, constraintViolations.size());
-    ConstraintViolation<ValidationTestOperator> cv = constraintViolations.iterator().next();
-    Assert.assertEquals("", bean.intField1, cv.getInvalidValue());
-    Assert.assertEquals("", "intField1", cv.getPropertyPath().toString());
-
-    // ensure DAG validation produces matching results
-    LogicalPlan dag = new LogicalPlan();
-    bean = dag.addOperator("testOperator", bean);
-
-    try {
-      dag.validate();
-      Assert.fail("should throw ConstraintViolationException");
-    } catch (ConstraintViolationException e) {
-      Assert.assertEquals("violation details", constraintViolations, e.getConstraintViolations());
-      String expRegex = ".*ValidationTestOperator\\{name=null}, propertyPath='intField1', message='must be greater than or equal to 2',.*value=1}]";
-      Assert.assertThat("exception message", e.getMessage(), RegexMatcher.matches(expRegex));
-    }
-
-    try {
-      bean.intField1 = 3;
-      dag.validate();
-      Assert.fail("should throw ConstraintViolationException");
-    } catch (ConstraintViolationException e) {
-      ConstraintViolation<?> cv2 = e.getConstraintViolations().iterator().next();
-      Assert.assertEquals("" + e.getConstraintViolations(), 1, constraintViolations.size());
-      Assert.assertEquals("", false, cv2.getInvalidValue());
-      Assert.assertEquals("", "validConfiguration", cv2.getPropertyPath().toString());
-    }
-    bean.stringField1 = "malhar3";
-
-    // annotated getter
-    try {
-      bean.getterProperty2 = null;
-      dag.validate();
-      Assert.fail("should throw ConstraintViolationException");
-    } catch (ConstraintViolationException e) {
-      ConstraintViolation<?> cv2 = e.getConstraintViolations().iterator().next();
-      Assert.assertEquals("" + e.getConstraintViolations(), 1, constraintViolations.size());
-      Assert.assertEquals("", null, cv2.getInvalidValue());
-      Assert.assertEquals("", "property2", cv2.getPropertyPath().toString());
-    }
-    bean.getterProperty2 = "";
-
-    // nested property
-    try {
-      bean.nestedBean.property = null;
-      dag.validate();
-      Assert.fail("should throw ConstraintViolationException");
-    } catch (ConstraintViolationException e) {
-      ConstraintViolation<?> cv2 = e.getConstraintViolations().iterator().next();
-      Assert.assertEquals("" + e.getConstraintViolations(), 1, constraintViolations.size());
-      Assert.assertEquals("", null, cv2.getInvalidValue());
-      Assert.assertEquals("", "nestedBean.property", cv2.getPropertyPath().toString());
-    }
-    bean.nestedBean.property = "";
-
-    // all valid
-    dag.validate();
-
-  }
-
-  @OperatorAnnotation(partitionable = false)
-  public static class TestOperatorAnnotationOperator extends BaseOperator {
-
-    @InputPortFieldAnnotation( optional = true)
-    final public transient DefaultInputPort<Object> input1 = new DefaultInputPort<Object>() {
-      @Override
-      public void process(Object tuple) {
-      }
-    };
-  }
-
-  class NoInputPortOperator extends BaseOperator {
-  }
-
-  @Test
-  public void testValidationForNonInputRootOperator() {
-    LogicalPlan dag = new LogicalPlan();
-    NoInputPortOperator x = dag.addOperator("x", new NoInputPortOperator());
-    try {
-      dag.validate();
-      Assert.fail("should fail because root operator is not input operator");
-    } catch (ValidationException e) {
-      // expected
-    }
-  }
-
-  @OperatorAnnotation(partitionable = false)
-  public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2> {
-
-    @Override
-    public Collection<Partition<TestOperatorAnnotationOperator2>> definePartitions(Collection<Partition<TestOperatorAnnotationOperator2>> partitions, PartitioningContext context)
-    {
-      return null;
-    }
-
-    @Override
-    public void partitioned(Map<Integer, Partition<TestOperatorAnnotationOperator2>> partitions)
-    {
-    }
-  }
-
-  @Test
-  public void testOperatorAnnotation() {
-    LogicalPlan dag = new LogicalPlan();
-    TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
-    TestOperatorAnnotationOperator operator = dag.addOperator("operator1", TestOperatorAnnotationOperator.class);
-    dag.addStream("Connection", input.outport, operator.input1);
-
-
-    dag.setAttribute(operator, OperatorContext.PARTITIONER, new StatelessPartitioner<TestOperatorAnnotationOperator>(2));
-
-    try {
-      dag.validate();
-      Assert.fail("should raise operator is not partitionable for operator1");
-    } catch (ValidationException e) {
-      Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
-    }
-
-    dag.setAttribute(operator, OperatorContext.PARTITIONER, null);
-    dag.setInputPortAttribute(operator.input1, PortContext.PARTITION_PARALLEL, true);
-
-    try {
-      dag.validate();
-      Assert.fail("should raise operator is not partitionable for operator1");
-    } catch (ValidationException e) {
-      Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " is not partitionable but PARTITION_PARALLEL attribute is set", e.getMessage());
-    }
-
-    dag.setInputPortAttribute(operator.input1, PortContext.PARTITION_PARALLEL, false);
-    dag.validate();
-
-    dag.removeOperator(operator);
-    TestOperatorAnnotationOperator2 operator2 = dag.addOperator("operator2", TestOperatorAnnotationOperator2.class);
-
-    try {
-      dag.validate();
-      Assert.fail("should raise operator is not partitionable for operator2");
-    } catch (ValidationException e) {
-      Assert.assertEquals("Operator " + dag.getMeta(operator2).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
-    }
-  }
-
-  @Test
-  public void testPortConnectionValidation() {
-
-    LogicalPlan dag = new LogicalPlan();
-
-    TestNonOptionalOutportInputOperator input = dag.addOperator("input1", TestNonOptionalOutportInputOperator.class);
-
-    try {
-      dag.validate();
-      Assert.fail("should raise port not connected for input1.outputPort1");
-
-    } catch (ValidationException e) {
-      Assert.assertEquals("", "Output port connection required: input1.outport1", e.getMessage());
-    }
-
-    GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
-    dag.addStream("stream1", input.outport1, o1.inport1);
-    dag.validate();
-
-    // required input
-    dag.addOperator("counter", CounterOperator.class);
-    try {
-      dag.validate();
-    } catch (ValidationException e) {
-      Assert.assertEquals("", "Input port connection required: counter.countInputPort", e.getMessage());
-    }
-
-  }
-
-  @Test
-  public void testAtMostOnceProcessingModeValidation() {
-    LogicalPlan dag = new LogicalPlan();
-
-    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
-    TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
-
-    GenericTestOperator amoOper = dag.addOperator("amoOper", GenericTestOperator.class);
-    dag.setAttribute(amoOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE);
-
-    dag.addStream("input1.outport", input1.outport, amoOper.inport1);
-    dag.addStream("input2.outport", input2.outport, amoOper.inport2);
-
-    GenericTestOperator outputOper = dag.addOperator("outputOper", GenericTestOperator.class);
-    dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE);
-    dag.addStream("aloOper.outport1", amoOper.outport1, outputOper.inport1);
-
-    try {
-      dag.validate();
-      Assert.fail("Exception expected for " + outputOper);
-    } catch (ValidationException ve) {
-      Assert.assertEquals("", ve.getMessage(), "Processing mode outputOper/AT_LEAST_ONCE not valid for source amoOper/AT_MOST_ONCE");
-    }
-    dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, null);
-    dag.validate();
-
-    OperatorMeta outputOperOm = dag.getMeta(outputOper);
-    Assert.assertEquals("" + outputOperOm.getAttributes(), Operator.ProcessingMode.AT_MOST_ONCE, outputOperOm.getValue(OperatorContext.PROCESSING_MODE));
-
-  }
-
-    @Test
-  public void testExactlyOnceProcessingModeValidation() {
-    LogicalPlan dag = new LogicalPlan();
-
-    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
-    TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
-
-    GenericTestOperator amoOper = dag.addOperator("amoOper", GenericTestOperator.class);
-    dag.setAttribute(amoOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.EXACTLY_ONCE);
-
-    dag.addStream("input1.outport", input1.outport, amoOper.inport1);
-    dag.addStream("input2.outport", input2.outport, amoOper.inport2);
-
-    GenericTestOperator outputOper = dag.addOperator("outputOper", GenericTestOperator.class);
-    dag.addStream("aloOper.outport1", amoOper.outport1, outputOper.inport1);
-
-    try {
-      dag.validate();
-      Assert.fail("Exception expected for " + outputOper);
-    } catch (ValidationException ve) {
-      Assert.assertEquals("", ve.getMessage(), "Processing mode for outputOper should be AT_MOST_ONCE for source amoOper/EXACTLY_ONCE");
-    }
-
-    dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE);
-
-    try {
-      dag.validate();
-      Assert.fail("Exception expected for " + outputOper);
-    } catch (ValidationException ve) {
-      Assert.assertEquals("", ve.getMessage(), "Processing mode outputOper/AT_LEAST_ONCE not valid for source amoOper/EXACTLY_ONCE");
-    }
-
-    // AT_MOST_ONCE is valid
-    dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE);
-    dag.validate();
-  }
-
-  @Test
-  public void testLocalityValidation() {
-    LogicalPlan dag = new LogicalPlan();
-
-    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
-    GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
-    StreamMeta s1 = dag.addStream("input1.outport", input1.outport, o1.inport1).setLocality(Locality.THREAD_LOCAL);
-    dag.validate();
-
-    TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
-    dag.addStream("input2.outport", input2.outport, o1.inport2);
-
-    try {
-      dag.validate();
-      Assert.fail("Exception expected for " + o1);
-    } catch (ValidationException ve) {
-      Assert.assertThat("", ve.getMessage(), RegexMatcher.matches("Locality THREAD_LOCAL invalid for operator .* with multiple input streams .*"));
-    }
-
-    s1.setLocality(null);
-    dag.validate();
-  }
-
-  private class TestAnnotationsOperator extends BaseOperator implements InputOperator {
-    //final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
-
-    @OutputPortFieldAnnotation( optional=false)
-    final public transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>();
-
-    @Override
-    public void emitTuples() {
-      // Emit Nothing
-
-    }
-  }
-
-  private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator{
-    // multiple ports w/o annotation, one of them must be connected
-    final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
-
-    @Override
-    public void emitTuples() {
-      // Emit Nothing
-
-    }
-  }
-
-  private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator{
-    // multiple ports w/o annotation, one of them must be connected
-    @OutputPortFieldAnnotation( optional=true)
-    final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
-    @OutputPortFieldAnnotation( optional=true)
-    final public transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>();
-    @Override
-    public void emitTuples() {
-      // Emit Nothing
-
-    }
-  }
-
-  @Test
-  public void testOutputPortAnnotation() {
-    LogicalPlan dag = new LogicalPlan();
-    TestAnnotationsOperator ta1 = dag.addOperator("testAnnotationsOperator", new TestAnnotationsOperator());
-
-    try {
-      dag.validate();
-      Assert.fail("should raise: port connection required");
-    } catch (ValidationException e) {
-      Assert.assertEquals("", "Output port connection required: testAnnotationsOperator.outport2", e.getMessage());
-    }
-
-    TestOutputOperator o2 = dag.addOperator("sink", new TestOutputOperator());
-    dag.addStream("s1", ta1.outport2, o2.inport);
-
-    dag.validate();
-
-    TestAnnotationsOperator2 ta2 = dag.addOperator("multiOutputPorts1", new TestAnnotationsOperator2());
-
-    try {
-      dag.validate();
-      Assert.fail("should raise: At least one output port must be connected");
-    } catch (ValidationException e) {
-      Assert.assertEquals("", "At least one output port must be connected: multiOutputPorts1", e.getMessage());
-    }
-    TestOutputOperator o3 = dag.addOperator("o3", new TestOutputOperator());
-    dag.addStream("s2", ta2.outport1, o3.inport);
-
-    dag.addOperator("multiOutputPorts3", new TestAnnotationsOperator3());
-    dag.validate();
-
-  }
-
-  /**
-   * Operator that can be used with default Java serialization instead of Kryo
-   */
-  @DefaultSerializer(JavaSerializer.class)
-  public static class JdkSerializableOperator extends BaseOperator implements Serializable {
-    private static final long serialVersionUID = -4024202339520027097L;
-
-    public abstract class SerializableInputPort<T> implements InputPort<T>, Sink<T>, java.io.Serializable {
-      private static final long serialVersionUID = 1L;
-
-      @Override
-      public Sink<T> getSink() {
-        return this;
-      }
-
-      @Override
-      public void setConnected(boolean connected) {
-      }
-
-      @Override
-      public void setup(PortContext context)
-      {
-      }
-
-      @Override
-      public void teardown()
-      {
-      }
-
-      @Override
-      public StreamCodec<T> getStreamCodec() {
-        return null;
-      }
-    }
-
-    @InputPortFieldAnnotation( optional=true)
-    final public InputPort<Object> inport1 = new SerializableInputPort<Object>() {
-      private static final long serialVersionUID = 1L;
-
-      @Override
-      final public void put(Object payload)
-      {
-      }
-
-      @Override
-      public int getCount(boolean reset)
-      {
-        return 0;
-      }
-
-    };
-  }
-
-  @Test
-  public void testJdkSerializableOperator() throws Exception {
-    LogicalPlan dag = new LogicalPlan();
-    dag.addOperator("o1", new JdkSerializableOperator());
-
-    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-    LogicalPlan.write(dag, outStream);
-    outStream.close();
-
-    LogicalPlan clonedDag = LogicalPlan.read(new ByteArrayInputStream(outStream.toByteArray()));
-    JdkSerializableOperator o1Clone = (JdkSerializableOperator)clonedDag.getOperatorMeta("o1").getOperator();
-    Assert.assertNotNull("port object null", o1Clone.inport1);
-  }
-
-  private static class TestStreamCodec implements StreamCodec<Object> {
-    @Override
-    public Object fromByteArray(Slice fragment)
-    {
-      return fragment.stringValue();
-    }
-
-    @Override
-    public Slice toByteArray(Object o)
-    {
-      byte[] b = o.toString().getBytes();
-      return new Slice(b, 0, b.length);
-    }
-
-    @Override
-    public int getPartition(Object o)
-    {
-      return o.hashCode() / 2;
-    }
-  }
-
-  public static class TestPortCodecOperator extends BaseOperator {
-    public transient final DefaultInputPort<Object> inport1 = new DefaultInputPort<Object>()
-    {
-      @Override
-      public void process(Object tuple)
-      {
-
-      }
-
-      @Override
-      public StreamCodec<Object> getStreamCodec()
-      {
-        return new TestStreamCodec();
-      }
-    };
-
-    @OutputPortFieldAnnotation( optional = true)
-    public transient final DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>();
-  }
-
-  /*
-  @Test
-  public void testStreamCodec() throws Exception {
-    LogicalPlan dag = new LogicalPlan();
-    TestGeneratorInputOperator input = dag.addOperator("input", TestGeneratorInputOperator.class);
-    GenericTestOperator gto1 = dag.addOperator("gto1", GenericTestOperator.class);
-    StreamMeta stream1 = dag.addStream("s1", input.outport, gto1.inport1);
-    StreamCodec<?> codec1 = new TestStreamCodec();
-    dag.setInputPortAttribute(gto1.inport1, PortContext.STREAM_CODEC, codec1);
-    dag.validate();
-    //Assert.assertEquals("Stream codec not set", stream1.getStreamCodec(), codec1);
-
-    GenericTestOperator gto2 = dag.addOperator("gto2", GenericTestOperator.class);
-    GenericTestOperator gto3 = dag.addOperator("gto3", GenericTestOperator.class);
-    StreamMeta stream2 = dag.addStream("s2", gto1.outport1, gto2.inport1, gto3.inport1);
-    dag.setInputPortAttribute(gto2.inport1, PortContext.STREAM_CODEC, codec1);
-    try {
-      dag.validate();
-    } catch (ValidationException e) {
-      String msg = e.getMessage();
-      if (!msg.startsWith("Stream codec not set on input port") || !msg.contains("gto3")
-              || !msg.contains(codec1.toString()) || !msg.endsWith("was specified on another port")) {
-        Assert.fail(String.format("LogicalPlan validation error msg: %s", msg));
-      }
-    }
-
-    dag.setInputPortAttribute(gto3.inport1, PortContext.STREAM_CODEC, codec1);
-    dag.validate();
-    //Assert.assertEquals("Stream codec not set", stream2.getStreamCodec(), codec1);
-
-    StreamCodec<?> codec2 = new TestStreamCodec();
-    dag.setInputPortAttribute(gto3.inport1, PortContext.STREAM_CODEC, codec2);
-    try {
-      dag.validate();
-    } catch (ValidationException e) {
-      String msg = e.getMessage();
-      if (!msg.startsWith("Conflicting stream codec set on input port") || !msg.contains("gto3")
-              || !msg.contains(codec2.toString()) || !msg.endsWith("was specified on another port")) {
-        Assert.fail(String.format("LogicalPlan validation error msg: %s", msg));
-      }
-    }
-
-    dag.setInputPortAttribute(gto3.inport1, PortContext.STREAM_CODEC, codec1);
-    TestPortCodecOperator pco = dag.addOperator("pco", TestPortCodecOperator.class);
-    StreamMeta stream3 = dag.addStream("s3", gto2.outport1, pco.inport1);
-    dag.validate();
-    //Assert.assertEquals("Stream codec class not set", stream3.getCodecClass(), TestStreamCodec.class);
-
-    dag.setInputPortAttribute(pco.inport1, PortContext.STREAM_CODEC, codec2);
-    dag.validate();
-    //Assert.assertEquals("Stream codec not set", stream3.getStreamCodec(), codec2);
-  }
-  */
-
-  @Test
-  public void testCheckpointableWithinAppWindowAnnotation()
-  {
-    LogicalPlan dag = new LogicalPlan();
-    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
-    GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
-    dag.addStream("Stream1", input1.outport, x.inport1);
-    dag.setAttribute(x, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
-    dag.setAttribute(x, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
-    dag.validate();
-
-    TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
-    CheckpointableWithinAppWindowOperator y = dag.addOperator("y", new CheckpointableWithinAppWindowOperator());
-    dag.addStream("Stream2", input2.outport, y.inport1);
-    dag.setAttribute(y, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
-    dag.setAttribute(y, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
-    dag.validate();
-
-    TestGeneratorInputOperator input3 = dag.addOperator("input3", TestGeneratorInputOperator.class);
-    NotCheckpointableWithinAppWindowOperator z = dag.addOperator("z", new NotCheckpointableWithinAppWindowOperator());
-    dag.addStream("Stream3", input3.outport, z.inport1);
-    dag.setAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
-    dag.setAttribute(z, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
-    try {
-      dag.validate();
-      Assert.fail("should fail because chekpoint window count is not a factor of application window count");
-    }
-    catch (ValidationException e) {
-      // expected
-    }
-
-    dag.setAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 30);
-    dag.validate();
-
-    dag.setAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 45);
-    try {
-      dag.validate();
-      Assert.fail("should fail because chekpoint window count is not a factor of application window count");
-    }
-    catch (ValidationException e) {
-      // expected
-    }
-  }
-
-  @OperatorAnnotation(checkpointableWithinAppWindow = true)
-  class CheckpointableWithinAppWindowOperator extends GenericTestOperator
-  {
-  }
-
-  @OperatorAnnotation(checkpointableWithinAppWindow = false)
-  class NotCheckpointableWithinAppWindowOperator extends GenericTestOperator
-  {
-  }
-
-  @Test
-  public void testInputPortHiding()
-  {
-    LogicalPlan dag = new LogicalPlan();
-    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
-    Operator2 operator2 = dag.addOperator("operator2", new Operator2());
-    dag.addStream("Stream1", input1.outport, operator2.input);
-    dag.validate();
-  }
-
-  @Test
-  public void testInvalidInputPortConnection()
-  {
-    LogicalPlan dag = new LogicalPlan();
-    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
-    Operator1 operator1 = dag.addOperator("operator3", new Operator3());
-    dag.addStream("Stream1", input1.outport, operator1.input);
-    try {
-      dag.validate();
-    } catch (ValidationException ex) {
-      Assert.assertTrue("validation message", ex.getMessage().startsWith("Invalid port connected"));
-      return;
-    }
-    Assert.fail();
-  }
-
-  class Operator1 extends BaseOperator
-  {
-    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
-    {
-      @Override
-      public void process(Object tuple)
-      {
-
-      }
-    };
-  }
-
-  class Operator2 extends Operator1
-  {
-    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
-    {
-      @Override
-      public void process(Object tuple)
-      {
-
-      }
-    };
-  }
-
-  class Operator3 extends Operator1
-  {
-    @InputPortFieldAnnotation(optional = true)
-    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
-    {
-      @Override
-      public void process(Object tuple)
-      {
-
-      }
-    };
-  }
-
-  @Test
-  public void testOutputPortHiding()
-  {
-    LogicalPlan dag = new LogicalPlan();
-    Operator5 operator5 = dag.addOperator("input", new Operator5());
-    Operator2 operator2 = dag.addOperator("operator2", new Operator2());
-    dag.addStream("Stream1", operator5.output, operator2.input);
-    dag.validate();
-  }
-
-  @Test(expected = ValidationException.class)
-  public void testInvalidOutputPortConnection()
-  {
-    LogicalPlan dag = new LogicalPlan();
-    Operator4 operator4 = dag.addOperator("input", new Operator5());
-    Operator3 operator3 = dag.addOperator("operator3", new Operator3());
-    dag.addStream("Stream1", operator4.output, operator3.input);
-    dag.validate();
-  }
-
-  class Operator4 extends BaseOperator implements InputOperator
-  {
-    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
-
-    @Override
-    public void emitTuples()
-    {
-
-    }
-  }
-
-  class Operator5 extends Operator4
-  {
-    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
-  }
-
-  /*
-  These were tests for operator semantics that verified if an operator class implements InputOperator then the same class should not declare input ports.
-  This would be done later when we are able to verify user code at compile-time.
-
-    validation()
-  {
-    if (n.getOperator() instanceof InputOperator) {
-      try {
-        for (Class<?> clazz : n.getOperator().getClass().getInterfaces()) {
-          if (clazz.getName().equals(InputOperator.class.getName())) {
-            for (Field field : n.getOperator().getClass().getDeclaredFields()) {
-              field.setAccessible(true);
-              Object declaredObject = field.get(n.getOperator());
-              if (declaredObject instanceof InputPort) {
-                throw new ValidationException("Operator class implements InputOperator and also declares input ports: " + n.name);
-              }
-            }
-            break;
-          }
-        }
-      }
-      catch (IllegalAccessException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-  @Test
-  public void testInvalidInputOperatorDeclaration()
-  {
-    LogicalPlan dag = new LogicalPlan();
-
-    TestGeneratorInputOperator.InvalidInputOperator inputOperator = dag.addOperator("input", new TestGeneratorInputOperator.InvalidInputOperator());
-    GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
-
-    dag.addStream("stream1", inputOperator.outport, operator2.inport1);
-
-    try {
-      dag.validate();
-      fail("validation should fail");
-    }
-    catch (ValidationException e) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testValidInputOperatorDeclaration()
-  {
-    LogicalPlan dag = new LogicalPlan();
-
-    TestGeneratorInputOperator.ValidGenericOperator operator1 = dag.addOperator("input", new TestGeneratorInputOperator.ValidGenericOperator());
-    GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
-
-    dag.addStream("stream1", operator1.outport, operator2.inport1);
-    dag.validate();
-  }
-  */
-}