You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by cs...@apache.org on 2015/09/17 06:58:56 UTC
[3/8] incubator-apex-core git commit: APEX-28 #resolve
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
new file mode 100644
index 0000000..94dce6c
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -0,0 +1,988 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import com.datatorrent.common.util.BaseOperator;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+import java.util.*;
+
+import javax.validation.*;
+import javax.validation.constraints.AssertTrue;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Pattern;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.Maps;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.api.*;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.netlet.util.Slice;
+import com.datatorrent.stram.engine.GenericTestOperator;
+import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+import com.datatorrent.stram.engine.TestNonOptionalOutportInputOperator;
+import com.datatorrent.stram.engine.TestOutputOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
+import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
+import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
+
+public class LogicalPlanTest {
+
+ @Test
+ public void testCycleDetection() {
+ LogicalPlan dag = new LogicalPlan();
+
+ //NodeConf operator1 = b.getOrAddNode("operator1");
+ GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
+ GenericTestOperator operator3 = dag.addOperator("operator3", GenericTestOperator.class);
+ GenericTestOperator operator4 = dag.addOperator("operator4", GenericTestOperator.class);
+ //NodeConf operator5 = b.getOrAddNode("operator5");
+ //NodeConf operator6 = b.getOrAddNode("operator6");
+ GenericTestOperator operator7 = dag.addOperator("operator7", GenericTestOperator.class);
+
+ // strongly connect n2-n3-n4-n2
+ dag.addStream("n2n3", operator2.outport1, operator3.inport1);
+
+ dag.addStream("n3n4", operator3.outport1, operator4.inport1);
+
+ dag.addStream("n4n2", operator4.outport1, operator2.inport1);
+
+ // self referencing operator cycle
+ StreamMeta n7n7 = dag.addStream("n7n7", operator7.outport1, operator7.inport1);
+ try {
+ n7n7.addSink(operator7.inport1);
+ fail("cannot add to stream again");
+ } catch (Exception e) {
+ // expected, stream can have single input/output only
+ }
+
+ List<List<String>> cycles = new ArrayList<List<String>>();
+ dag.findStronglyConnected(dag.getMeta(operator7), cycles);
+ assertEquals("operator self reference", 1, cycles.size());
+ assertEquals("operator self reference", 1, cycles.get(0).size());
+ assertEquals("operator self reference", dag.getMeta(operator7).getName(), cycles.get(0).get(0));
+
+ // 3 operator cycle
+ cycles.clear();
+ dag.findStronglyConnected(dag.getMeta(operator4), cycles);
+ assertEquals("3 operator cycle", 1, cycles.size());
+ assertEquals("3 operator cycle", 3, cycles.get(0).size());
+ assertTrue("operator2", cycles.get(0).contains(dag.getMeta(operator2).getName()));
+ assertTrue("operator3", cycles.get(0).contains(dag.getMeta(operator3).getName()));
+ assertTrue("operator4", cycles.get(0).contains(dag.getMeta(operator4).getName()));
+
+ try {
+ dag.validate();
+ fail("validation should fail");
+ } catch (ValidationException e) {
+ // expected
+ }
+
+ }
+
+ public static class ValidationOperator extends BaseOperator {
+ public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<Object>();
+
+ public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<Object>();
+ }
+
+ public static class CounterOperator extends BaseOperator {
+ final public transient InputPort<Object> countInputPort = new DefaultInputPort<Object>() {
+ @Override
+ final public void process(Object payload) {
+ }
+ };
+ }
+
+ @Test
+ public void testLogicalPlanSerialization() throws Exception {
+
+ LogicalPlan dag = new LogicalPlan();
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+
+ ValidationOperator validationNode = dag.addOperator("validationNode", ValidationOperator.class);
+ CounterOperator countGoodNode = dag.addOperator("countGoodNode", CounterOperator.class);
+ CounterOperator countBadNode = dag.addOperator("countBadNode", CounterOperator.class);
+ //ConsoleOutputOperator echoBadNode = dag.addOperator("echoBadNode", ConsoleOutputOperator.class);
+
+ // good tuples to counter operator
+ dag.addStream("goodTuplesStream", validationNode.goodOutputPort, countGoodNode.countInputPort);
+
+ // bad tuples to separate stream and echo operator
+ // (stream with 2 outputs)
+ dag.addStream("badTuplesStream", validationNode.badOutputPort, countBadNode.countInputPort);
+
+ Assert.assertEquals("number root operators", 1, dag.getRootOperators().size());
+ Assert.assertEquals("root operator id", "validationNode", dag.getRootOperators().get(0).getName());
+
+ dag.getContextAttributes(countGoodNode).put(OperatorContext.SPIN_MILLIS, 10);
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ LogicalPlan.write(dag, bos);
+
+ // System.out.println("serialized size: " + bos.toByteArray().length);
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+ LogicalPlan dagClone = LogicalPlan.read(bis);
+ Assert.assertNotNull(dagClone);
+ Assert.assertEquals("number operators in clone", dag.getAllOperators().size(), dagClone.getAllOperators().size());
+ Assert.assertEquals("number root operators in clone", 1, dagClone.getRootOperators().size());
+ Assert.assertTrue("root operator in operators", dagClone.getAllOperators().contains(dagClone.getRootOperators().get(0)));
+
+
+ Operator countGoodNodeClone = dagClone.getOperatorMeta("countGoodNode").getOperator();
+ Assert.assertEquals("", new Integer(10), dagClone.getContextAttributes(countGoodNodeClone).get(OperatorContext.SPIN_MILLIS));
+
+ }
+
+ @Test
+ public void testDeleteOperator()
+ {
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
+ GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+ GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+ dag.addStream("s0", input.outport, o1.inport1);
+ StreamMeta s1 = dag.addStream("s1", o1.outport1, o2.inport1);
+ dag.validate();
+ Assert.assertEquals("", 3, dag.getAllOperators().size());
+
+ dag.removeOperator(o2);
+ s1.remove();
+ dag.validate();
+ Assert.assertEquals("", 2, dag.getAllOperators().size());
+ }
+
+ public static class ValidationTestOperator extends BaseOperator implements InputOperator {
+ @NotNull
+ @Pattern(regexp=".*malhar.*", message="Value has to contain 'malhar'!")
+ private String stringField1;
+
+ @Min(2)
+ private int intField1;
+
+ @AssertTrue(message="stringField1 should end with intField1")
+ private boolean isValidConfiguration() {
+ return stringField1.endsWith(String.valueOf(intField1));
+ }
+
+ private String getterProperty2 = "";
+
+ @NotNull
+ public String getProperty2() {
+ return getterProperty2;
+ }
+
+ public void setProperty2(String s) {
+ // annotations need to be on the getter
+ getterProperty2 = s;
+ }
+
+ private String[] stringArrayField;
+
+ public String[] getStringArrayField() {
+ return stringArrayField;
+ }
+
+ public void setStringArrayField(String[] stringArrayField) {
+ this.stringArrayField = stringArrayField;
+ }
+
+ public class Nested {
+ @NotNull
+ private String property = "";
+
+ public String getProperty() {
+ return property;
+ }
+
+ public void setProperty(String property) {
+ this.property = property;
+ }
+
+ }
+
+ @Valid
+ private final Nested nestedBean = new Nested();
+
+ private String stringProperty2;
+
+ public String getStringProperty2() {
+ return stringProperty2;
+ }
+
+ public void setStringProperty2(String stringProperty2) {
+ this.stringProperty2 = stringProperty2;
+ }
+
+ private Map<String, String> mapProperty = Maps.newHashMap();
+
+ public Map<String, String> getMapProperty()
+ {
+ return mapProperty;
+ }
+
+ public void setMapProperty(Map<String, String> mapProperty)
+ {
+ this.mapProperty = mapProperty;
+ }
+
+ @Override
+ public void emitTuples() {
+ // Emit no tuples
+
+ }
+
+ }
+
+ @Test
+ public void testOperatorValidation() {
+
+ ValidationTestOperator bean = new ValidationTestOperator();
+ bean.stringField1 = "malhar1";
+ bean.intField1 = 1;
+
+ // ensure validation standalone produces expected results
+ ValidatorFactory factory =
+ Validation.buildDefaultValidatorFactory();
+ Validator validator = factory.getValidator();
+ Set<ConstraintViolation<ValidationTestOperator>> constraintViolations =
+ validator.validate(bean);
+ //for (ConstraintViolation<ValidationTestOperator> cv : constraintViolations) {
+ // System.out.println("validation error: " + cv);
+ //}
+ Assert.assertEquals("" + constraintViolations,1, constraintViolations.size());
+ ConstraintViolation<ValidationTestOperator> cv = constraintViolations.iterator().next();
+ Assert.assertEquals("", bean.intField1, cv.getInvalidValue());
+ Assert.assertEquals("", "intField1", cv.getPropertyPath().toString());
+
+ // ensure DAG validation produces matching results
+ LogicalPlan dag = new LogicalPlan();
+ bean = dag.addOperator("testOperator", bean);
+
+ try {
+ dag.validate();
+ Assert.fail("should throw ConstraintViolationException");
+ } catch (ConstraintViolationException e) {
+ Assert.assertEquals("violation details", constraintViolations, e.getConstraintViolations());
+ String expRegex = ".*ValidationTestOperator\\{name=null}, propertyPath='intField1', message='must be greater than or equal to 2',.*value=1}]";
+ Assert.assertThat("exception message", e.getMessage(), RegexMatcher.matches(expRegex));
+ }
+
+ try {
+ bean.intField1 = 3;
+ dag.validate();
+ Assert.fail("should throw ConstraintViolationException");
+ } catch (ConstraintViolationException e) {
+ ConstraintViolation<?> cv2 = e.getConstraintViolations().iterator().next();
+ Assert.assertEquals("" + e.getConstraintViolations(), 1, constraintViolations.size());
+ Assert.assertEquals("", false, cv2.getInvalidValue());
+ Assert.assertEquals("", "validConfiguration", cv2.getPropertyPath().toString());
+ }
+ bean.stringField1 = "malhar3";
+
+ // annotated getter
+ try {
+ bean.getterProperty2 = null;
+ dag.validate();
+ Assert.fail("should throw ConstraintViolationException");
+ } catch (ConstraintViolationException e) {
+ ConstraintViolation<?> cv2 = e.getConstraintViolations().iterator().next();
+ Assert.assertEquals("" + e.getConstraintViolations(), 1, constraintViolations.size());
+ Assert.assertEquals("", null, cv2.getInvalidValue());
+ Assert.assertEquals("", "property2", cv2.getPropertyPath().toString());
+ }
+ bean.getterProperty2 = "";
+
+ // nested property
+ try {
+ bean.nestedBean.property = null;
+ dag.validate();
+ Assert.fail("should throw ConstraintViolationException");
+ } catch (ConstraintViolationException e) {
+ ConstraintViolation<?> cv2 = e.getConstraintViolations().iterator().next();
+ Assert.assertEquals("" + e.getConstraintViolations(), 1, constraintViolations.size());
+ Assert.assertEquals("", null, cv2.getInvalidValue());
+ Assert.assertEquals("", "nestedBean.property", cv2.getPropertyPath().toString());
+ }
+ bean.nestedBean.property = "";
+
+ // all valid
+ dag.validate();
+
+ }
+
+ @OperatorAnnotation(partitionable = false)
+ public static class TestOperatorAnnotationOperator extends BaseOperator {
+
+ @InputPortFieldAnnotation( optional = true)
+ final public transient DefaultInputPort<Object> input1 = new DefaultInputPort<Object>() {
+ @Override
+ public void process(Object tuple) {
+ }
+ };
+ }
+
+ class NoInputPortOperator extends BaseOperator {
+ }
+
+ @Test
+ public void testValidationForNonInputRootOperator() {
+ LogicalPlan dag = new LogicalPlan();
+ NoInputPortOperator x = dag.addOperator("x", new NoInputPortOperator());
+ try {
+ dag.validate();
+ Assert.fail("should fail because root operator is not input operator");
+ } catch (ValidationException e) {
+ // expected
+ }
+ }
+
+ @OperatorAnnotation(partitionable = false)
+ public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2> {
+
+ @Override
+ public Collection<Partition<TestOperatorAnnotationOperator2>> definePartitions(Collection<Partition<TestOperatorAnnotationOperator2>> partitions, PartitioningContext context)
+ {
+ return null;
+ }
+
+ @Override
+ public void partitioned(Map<Integer, Partition<TestOperatorAnnotationOperator2>> partitions)
+ {
+ }
+ }
+
+ @Test
+ public void testOperatorAnnotation() {
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
+ TestOperatorAnnotationOperator operator = dag.addOperator("operator1", TestOperatorAnnotationOperator.class);
+ dag.addStream("Connection", input.outport, operator.input1);
+
+
+ dag.setAttribute(operator, OperatorContext.PARTITIONER, new StatelessPartitioner<TestOperatorAnnotationOperator>(2));
+
+ try {
+ dag.validate();
+ Assert.fail("should raise operator is not partitionable for operator1");
+ } catch (ValidationException e) {
+ Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
+ }
+
+ dag.setAttribute(operator, OperatorContext.PARTITIONER, null);
+ dag.setInputPortAttribute(operator.input1, PortContext.PARTITION_PARALLEL, true);
+
+ try {
+ dag.validate();
+ Assert.fail("should raise operator is not partitionable for operator1");
+ } catch (ValidationException e) {
+ Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " is not partitionable but PARTITION_PARALLEL attribute is set", e.getMessage());
+ }
+
+ dag.setInputPortAttribute(operator.input1, PortContext.PARTITION_PARALLEL, false);
+ dag.validate();
+
+ dag.removeOperator(operator);
+ TestOperatorAnnotationOperator2 operator2 = dag.addOperator("operator2", TestOperatorAnnotationOperator2.class);
+
+ try {
+ dag.validate();
+ Assert.fail("should raise operator is not partitionable for operator2");
+ } catch (ValidationException e) {
+ Assert.assertEquals("Operator " + dag.getMeta(operator2).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPortConnectionValidation() {
+
+ LogicalPlan dag = new LogicalPlan();
+
+ TestNonOptionalOutportInputOperator input = dag.addOperator("input1", TestNonOptionalOutportInputOperator.class);
+
+ try {
+ dag.validate();
+ Assert.fail("should raise port not connected for input1.outputPort1");
+
+ } catch (ValidationException e) {
+ Assert.assertEquals("", "Output port connection required: input1.outport1", e.getMessage());
+ }
+
+ GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+ dag.addStream("stream1", input.outport1, o1.inport1);
+ dag.validate();
+
+ // required input
+ dag.addOperator("counter", CounterOperator.class);
+ try {
+ dag.validate();
+ } catch (ValidationException e) {
+ Assert.assertEquals("", "Input port connection required: counter.countInputPort", e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testAtMostOnceProcessingModeValidation() {
+ LogicalPlan dag = new LogicalPlan();
+
+ TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
+ TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
+
+ GenericTestOperator amoOper = dag.addOperator("amoOper", GenericTestOperator.class);
+ dag.setAttribute(amoOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE);
+
+ dag.addStream("input1.outport", input1.outport, amoOper.inport1);
+ dag.addStream("input2.outport", input2.outport, amoOper.inport2);
+
+ GenericTestOperator outputOper = dag.addOperator("outputOper", GenericTestOperator.class);
+ dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE);
+ dag.addStream("aloOper.outport1", amoOper.outport1, outputOper.inport1);
+
+ try {
+ dag.validate();
+ Assert.fail("Exception expected for " + outputOper);
+ } catch (ValidationException ve) {
+ Assert.assertEquals("", ve.getMessage(), "Processing mode outputOper/AT_LEAST_ONCE not valid for source amoOper/AT_MOST_ONCE");
+ }
+ dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, null);
+ dag.validate();
+
+ OperatorMeta outputOperOm = dag.getMeta(outputOper);
+ Assert.assertEquals("" + outputOperOm.getAttributes(), Operator.ProcessingMode.AT_MOST_ONCE, outputOperOm.getValue(OperatorContext.PROCESSING_MODE));
+
+ }
+
+ @Test
+ public void testExactlyOnceProcessingModeValidation() {
+ LogicalPlan dag = new LogicalPlan();
+
+ TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
+ TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
+
+ GenericTestOperator amoOper = dag.addOperator("amoOper", GenericTestOperator.class);
+ dag.setAttribute(amoOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.EXACTLY_ONCE);
+
+ dag.addStream("input1.outport", input1.outport, amoOper.inport1);
+ dag.addStream("input2.outport", input2.outport, amoOper.inport2);
+
+ GenericTestOperator outputOper = dag.addOperator("outputOper", GenericTestOperator.class);
+ dag.addStream("aloOper.outport1", amoOper.outport1, outputOper.inport1);
+
+ try {
+ dag.validate();
+ Assert.fail("Exception expected for " + outputOper);
+ } catch (ValidationException ve) {
+ Assert.assertEquals("", ve.getMessage(), "Processing mode for outputOper should be AT_MOST_ONCE for source amoOper/EXACTLY_ONCE");
+ }
+
+ dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE);
+
+ try {
+ dag.validate();
+ Assert.fail("Exception expected for " + outputOper);
+ } catch (ValidationException ve) {
+ Assert.assertEquals("", ve.getMessage(), "Processing mode outputOper/AT_LEAST_ONCE not valid for source amoOper/EXACTLY_ONCE");
+ }
+
+ // AT_MOST_ONCE is valid
+ dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE);
+ dag.validate();
+ }
+
+ @Test
+ public void testLocalityValidation() {
+ LogicalPlan dag = new LogicalPlan();
+
+ TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
+ GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+ StreamMeta s1 = dag.addStream("input1.outport", input1.outport, o1.inport1).setLocality(Locality.THREAD_LOCAL);
+ dag.validate();
+
+ TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
+ dag.addStream("input2.outport", input2.outport, o1.inport2);
+
+ try {
+ dag.validate();
+ Assert.fail("Exception expected for " + o1);
+ } catch (ValidationException ve) {
+ Assert.assertThat("", ve.getMessage(), RegexMatcher.matches("Locality THREAD_LOCAL invalid for operator .* with multiple input streams .*"));
+ }
+
+ s1.setLocality(null);
+ dag.validate();
+ }
+
+ private class TestAnnotationsOperator extends BaseOperator implements InputOperator {
+ //final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+
+ @OutputPortFieldAnnotation( optional=false)
+ final public transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>();
+
+ @Override
+ public void emitTuples() {
+ // Emit Nothing
+
+ }
+ }
+
+ private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator{
+ // multiple ports w/o annotation, one of them must be connected
+ final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+
+ @Override
+ public void emitTuples() {
+ // Emit Nothing
+
+ }
+ }
+
+ private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator{
+ // multiple ports w/o annotation, one of them must be connected
+ @OutputPortFieldAnnotation( optional=true)
+ final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+ @OutputPortFieldAnnotation( optional=true)
+ final public transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>();
+ @Override
+ public void emitTuples() {
+ // Emit Nothing
+
+ }
+ }
+
+ @Test
+ public void testOutputPortAnnotation() {
+ LogicalPlan dag = new LogicalPlan();
+ TestAnnotationsOperator ta1 = dag.addOperator("testAnnotationsOperator", new TestAnnotationsOperator());
+
+ try {
+ dag.validate();
+ Assert.fail("should raise: port connection required");
+ } catch (ValidationException e) {
+ Assert.assertEquals("", "Output port connection required: testAnnotationsOperator.outport2", e.getMessage());
+ }
+
+ TestOutputOperator o2 = dag.addOperator("sink", new TestOutputOperator());
+ dag.addStream("s1", ta1.outport2, o2.inport);
+
+ dag.validate();
+
+ TestAnnotationsOperator2 ta2 = dag.addOperator("multiOutputPorts1", new TestAnnotationsOperator2());
+
+ try {
+ dag.validate();
+ Assert.fail("should raise: At least one output port must be connected");
+ } catch (ValidationException e) {
+ Assert.assertEquals("", "At least one output port must be connected: multiOutputPorts1", e.getMessage());
+ }
+ TestOutputOperator o3 = dag.addOperator("o3", new TestOutputOperator());
+ dag.addStream("s2", ta2.outport1, o3.inport);
+
+ dag.addOperator("multiOutputPorts3", new TestAnnotationsOperator3());
+ dag.validate();
+
+ }
+
+ /**
+ * Operator that can be used with default Java serialization instead of Kryo
+ */
+ @DefaultSerializer(JavaSerializer.class)
+ public static class JdkSerializableOperator extends BaseOperator implements Serializable {
+ private static final long serialVersionUID = -4024202339520027097L;
+
+ public abstract class SerializableInputPort<T> implements InputPort<T>, Sink<T>, java.io.Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Sink<T> getSink() {
+ return this;
+ }
+
+ @Override
+ public void setConnected(boolean connected) {
+ }
+
+ @Override
+ public void setup(PortContext context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public StreamCodec<T> getStreamCodec() {
+ return null;
+ }
+ }
+
+ @InputPortFieldAnnotation( optional=true)
+ final public InputPort<Object> inport1 = new SerializableInputPort<Object>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ final public void put(Object payload)
+ {
+ }
+
+ @Override
+ public int getCount(boolean reset)
+ {
+ return 0;
+ }
+
+ };
+ }
+
+ @Test
+ public void testJdkSerializableOperator() throws Exception {
+ LogicalPlan dag = new LogicalPlan();
+ dag.addOperator("o1", new JdkSerializableOperator());
+
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ LogicalPlan.write(dag, outStream);
+ outStream.close();
+
+ LogicalPlan clonedDag = LogicalPlan.read(new ByteArrayInputStream(outStream.toByteArray()));
+ JdkSerializableOperator o1Clone = (JdkSerializableOperator)clonedDag.getOperatorMeta("o1").getOperator();
+ Assert.assertNotNull("port object null", o1Clone.inport1);
+ }
+
+ private static class TestStreamCodec implements StreamCodec<Object> {
+ @Override
+ public Object fromByteArray(Slice fragment)
+ {
+ return fragment.stringValue();
+ }
+
+ @Override
+ public Slice toByteArray(Object o)
+ {
+ byte[] b = o.toString().getBytes();
+ return new Slice(b, 0, b.length);
+ }
+
+ @Override
+ public int getPartition(Object o)
+ {
+ return o.hashCode() / 2;
+ }
+ }
+
+ public static class TestPortCodecOperator extends BaseOperator {
+ public transient final DefaultInputPort<Object> inport1 = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object tuple)
+ {
+
+ }
+
+ @Override
+ public StreamCodec<Object> getStreamCodec()
+ {
+ return new TestStreamCodec();
+ }
+ };
+
+ @OutputPortFieldAnnotation( optional = true)
+ public transient final DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>();
+ }
+
+ /*
+ @Test
+ public void testStreamCodec() throws Exception {
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input = dag.addOperator("input", TestGeneratorInputOperator.class);
+ GenericTestOperator gto1 = dag.addOperator("gto1", GenericTestOperator.class);
+ StreamMeta stream1 = dag.addStream("s1", input.outport, gto1.inport1);
+ StreamCodec<?> codec1 = new TestStreamCodec();
+ dag.setInputPortAttribute(gto1.inport1, PortContext.STREAM_CODEC, codec1);
+ dag.validate();
+ //Assert.assertEquals("Stream codec not set", stream1.getStreamCodec(), codec1);
+
+ GenericTestOperator gto2 = dag.addOperator("gto2", GenericTestOperator.class);
+ GenericTestOperator gto3 = dag.addOperator("gto3", GenericTestOperator.class);
+ StreamMeta stream2 = dag.addStream("s2", gto1.outport1, gto2.inport1, gto3.inport1);
+ dag.setInputPortAttribute(gto2.inport1, PortContext.STREAM_CODEC, codec1);
+ try {
+ dag.validate();
+ } catch (ValidationException e) {
+ String msg = e.getMessage();
+ if (!msg.startsWith("Stream codec not set on input port") || !msg.contains("gto3")
+ || !msg.contains(codec1.toString()) || !msg.endsWith("was specified on another port")) {
+ Assert.fail(String.format("LogicalPlan validation error msg: %s", msg));
+ }
+ }
+
+ dag.setInputPortAttribute(gto3.inport1, PortContext.STREAM_CODEC, codec1);
+ dag.validate();
+ //Assert.assertEquals("Stream codec not set", stream2.getStreamCodec(), codec1);
+
+ StreamCodec<?> codec2 = new TestStreamCodec();
+ dag.setInputPortAttribute(gto3.inport1, PortContext.STREAM_CODEC, codec2);
+ try {
+ dag.validate();
+ } catch (ValidationException e) {
+ String msg = e.getMessage();
+ if (!msg.startsWith("Conflicting stream codec set on input port") || !msg.contains("gto3")
+ || !msg.contains(codec2.toString()) || !msg.endsWith("was specified on another port")) {
+ Assert.fail(String.format("LogicalPlan validation error msg: %s", msg));
+ }
+ }
+
+ dag.setInputPortAttribute(gto3.inport1, PortContext.STREAM_CODEC, codec1);
+ TestPortCodecOperator pco = dag.addOperator("pco", TestPortCodecOperator.class);
+ StreamMeta stream3 = dag.addStream("s3", gto2.outport1, pco.inport1);
+ dag.validate();
+ //Assert.assertEquals("Stream codec class not set", stream3.getCodecClass(), TestStreamCodec.class);
+
+ dag.setInputPortAttribute(pco.inport1, PortContext.STREAM_CODEC, codec2);
+ dag.validate();
+ //Assert.assertEquals("Stream codec not set", stream3.getStreamCodec(), codec2);
+ }
+ */
+
+ @Test
+ public void testCheckpointableWithinAppWindowAnnotation()
+ {
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
+ GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
+ dag.addStream("Stream1", input1.outport, x.inport1);
+ dag.setAttribute(x, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
+ dag.setAttribute(x, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
+ dag.validate();
+
+ TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
+ CheckpointableWithinAppWindowOperator y = dag.addOperator("y", new CheckpointableWithinAppWindowOperator());
+ dag.addStream("Stream2", input2.outport, y.inport1);
+ dag.setAttribute(y, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
+ dag.setAttribute(y, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
+ dag.validate();
+
+ TestGeneratorInputOperator input3 = dag.addOperator("input3", TestGeneratorInputOperator.class);
+ NotCheckpointableWithinAppWindowOperator z = dag.addOperator("z", new NotCheckpointableWithinAppWindowOperator());
+ dag.addStream("Stream3", input3.outport, z.inport1);
+ dag.setAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
+ dag.setAttribute(z, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
+ try {
+ dag.validate();
+ Assert.fail("should fail because chekpoint window count is not a factor of application window count");
+ }
+ catch (ValidationException e) {
+ // expected
+ }
+
+ dag.setAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 30);
+ dag.validate();
+
+ dag.setAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 45);
+ try {
+ dag.validate();
+ Assert.fail("should fail because chekpoint window count is not a factor of application window count");
+ }
+ catch (ValidationException e) {
+ // expected
+ }
+ }
+
+ @OperatorAnnotation(checkpointableWithinAppWindow = true)
+ class CheckpointableWithinAppWindowOperator extends GenericTestOperator
+ {
+ }
+
+ @OperatorAnnotation(checkpointableWithinAppWindow = false)
+ class NotCheckpointableWithinAppWindowOperator extends GenericTestOperator
+ {
+ }
+
+ @Test
+ public void testInputPortHiding()
+ {
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
+ Operator2 operator2 = dag.addOperator("operator2", new Operator2());
+ dag.addStream("Stream1", input1.outport, operator2.input);
+ dag.validate();
+ }
+
+ @Test
+ public void testInvalidInputPortConnection()
+ {
+ LogicalPlan dag = new LogicalPlan();
+ TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
+ Operator1 operator1 = dag.addOperator("operator3", new Operator3());
+ dag.addStream("Stream1", input1.outport, operator1.input);
+ try {
+ dag.validate();
+ } catch (ValidationException ex) {
+ Assert.assertTrue("validation message", ex.getMessage().startsWith("Invalid port connected"));
+ return;
+ }
+ Assert.fail();
+ }
+
+ class Operator1 extends BaseOperator
+ {
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object tuple)
+ {
+
+ }
+ };
+ }
+
+ class Operator2 extends Operator1
+ {
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object tuple)
+ {
+
+ }
+ };
+ }
+
+ class Operator3 extends Operator1
+ {
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object tuple)
+ {
+
+ }
+ };
+ }
+
+ @Test
+ public void testOutputPortHiding()
+ {
+ LogicalPlan dag = new LogicalPlan();
+ Operator5 operator5 = dag.addOperator("input", new Operator5());
+ Operator2 operator2 = dag.addOperator("operator2", new Operator2());
+ dag.addStream("Stream1", operator5.output, operator2.input);
+ dag.validate();
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testInvalidOutputPortConnection()
+ {
+ LogicalPlan dag = new LogicalPlan();
+ Operator4 operator4 = dag.addOperator("input", new Operator5());
+ Operator3 operator3 = dag.addOperator("operator3", new Operator3());
+ dag.addStream("Stream1", operator4.output, operator3.input);
+ dag.validate();
+ }
+
+ class Operator4 extends BaseOperator implements InputOperator
+ {
+ public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
+
+ @Override
+ public void emitTuples()
+ {
+
+ }
+ }
+
+ class Operator5 extends Operator4
+ {
+ public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
+ }
+
+ /*
+ These were tests for operator semantics that verified if an operator class implements InputOperator then the same class should not declare input ports.
+ This would be done later when we are able to verify user code at compile-time.
+
+ validation()
+ {
+ if (n.getOperator() instanceof InputOperator) {
+ try {
+ for (Class<?> clazz : n.getOperator().getClass().getInterfaces()) {
+ if (clazz.getName().equals(InputOperator.class.getName())) {
+ for (Field field : n.getOperator().getClass().getDeclaredFields()) {
+ field.setAccessible(true);
+ Object declaredObject = field.get(n.getOperator());
+ if (declaredObject instanceof InputPort) {
+ throw new ValidationException("Operator class implements InputOperator and also declares input ports: " + n.name);
+ }
+ }
+ break;
+ }
+ }
+ }
+ catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ @Test
+ public void testInvalidInputOperatorDeclaration()
+ {
+ LogicalPlan dag = new LogicalPlan();
+
+ TestGeneratorInputOperator.InvalidInputOperator inputOperator = dag.addOperator("input", new TestGeneratorInputOperator.InvalidInputOperator());
+ GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
+
+ dag.addStream("stream1", inputOperator.outport, operator2.inport1);
+
+ try {
+ dag.validate();
+ fail("validation should fail");
+ }
+ catch (ValidationException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testValidInputOperatorDeclaration()
+ {
+ LogicalPlan dag = new LogicalPlan();
+
+ TestGeneratorInputOperator.ValidGenericOperator operator1 = dag.addOperator("input", new TestGeneratorInputOperator.ValidGenericOperator());
+ GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
+
+ dag.addStream("stream1", operator1.outport, operator2.inport1);
+ dag.validate();
+ }
+ */
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/engine/src/test/resources/schemaTestTopology.json
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/schemaTestTopology.json b/engine/src/test/resources/schemaTestTopology.json
index 6c779fd..46c0e78 100644
--- a/engine/src/test/resources/schemaTestTopology.json
+++ b/engine/src/test/resources/schemaTestTopology.json
@@ -36,7 +36,7 @@
}
],
"schema": {
- "class": "com.datatorrent.stram.plan.LogicalPlanConfigurationTest$TestSchema"
+ "class": "com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$TestSchema"
}
}
]