You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/03/28 17:45:58 UTC
[1/2] incubator-apex-malhar git commit: APEXMALHAR-2015: Projection
Operator & its unit tests - Projection Operator - Unit tests for select/drop
fields & projection operator
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/master c829adaf1 -> 79eeff782
APEXMALHAR-2015: Projection Operator & its unit tests
- Projection Operator
- Unit tests for select/drop fields & projection operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/19029582
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/19029582
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/19029582
Branch: refs/heads/master
Commit: 19029582be0725319597aa6f680a8a61ab7b015e
Parents: becee7f
Author: Pradeep A. Dalvi <pr...@datatorrent.com>
Authored: Wed Mar 16 17:39:17 2016 +0530
Committer: Pradeep A. Dalvi <pr...@datatorrent.com>
Committed: Wed Mar 23 14:10:45 2016 +0530
----------------------------------------------------------------------
.../lib/projection/ProjectionOperator.java | 310 +++++++++++++++++++
.../lib/projection/ActivateTest.java | 160 ++++++++++
.../lib/projection/ProjectionTest.java | 276 +++++++++++++++++
3 files changed, 746 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/19029582/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java
new file mode 100644
index 0000000..8598de9
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.projection;
+
+import java.lang.reflect.Field;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * <b>ProjectionOperator</b>
+ * Projection Operator projects defined set of fields from given selectFields/dropFields
+ *
+ * <b>Parameters</b>
+ * - selectFields: comma separated list of fields to be selected from input tuples
+ * - dropFields: comma separated list of fields to be dropped from input tuples
+ * selectFields and dropFields are optional and either of them shall be specified
+ * When both are not specified, all fields shall be projected to downstream operator
+ * When both are specified, selectFields shall be given the preference
+ *
+ * <b>Input Port</b> takes POJOs as an input
+ *
+ * <b>Output Ports</b>
+ * - projected port emits POJOs with projected fields from input POJOs
+ * - remainder port, if connected, emits POJOs with remainder fields from input POJOs
+ * - error port emits input POJOs as is upon error situations
+ *
+ * <b>Examples</b>
+ * For {a, b, c} type of input tuples
+ * - when selectFields = "" and dropFields = "", projected port shall emit {a, b, c}
+ * - when selectFields = "a" and dropFields = "b", projected port shall emit {a}, remainder {b, c}
+ * - when selectFields = "b", projected port shall emit {b} and remainder port shall emit {a, c}
+ * - when dropFields = "b", projected port shall emit {a, c} and remainder port shall emit {b}
+ *
+ */
+public class ProjectionOperator extends BaseOperator implements Operator.ActivationListener<Context>
+{
+ protected String selectFields;
+ protected String dropFields;
+ protected String condition;
+
+ static class TypeInfo
+ {
+ String name;
+ Class type;
+ PojoUtils.Setter setter;
+ PojoUtils.Getter getter;
+
+ public TypeInfo(String name, Class<?> type)
+ {
+ this.name = name;
+ this.type = type;
+ }
+
+ public String toString()
+ {
+ String s = new String("'name': " + name + " 'type': " + type);
+ return s;
+ }
+ }
+
+ private transient List<TypeInfo> projectedFields = new ArrayList<>();
+ private transient List<TypeInfo> remainderFields = new ArrayList<>();
+
+ @VisibleForTesting
+ List<TypeInfo> getProjectedFields()
+ {
+ return projectedFields;
+ }
+
+ @VisibleForTesting
+ List<TypeInfo> getRemainderFields()
+ {
+ return remainderFields;
+ }
+
+ @AutoMetric
+ protected long projectedTuples;
+
+ @AutoMetric
+ protected long remainderTuples;
+
+ @AutoMetric
+ protected long errorTuples;
+
+ protected Class<?> inClazz = null;
+ protected Class<?> projectedClazz = null;
+ protected Class<?> remainderClazz = null;
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ public void setup(PortContext context)
+ {
+ inClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object t)
+ {
+ handleProjection(t);
+ }
+ };
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> projected = new DefaultOutputPort<Object>()
+ {
+ public void setup(PortContext context)
+ {
+ projectedClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> remainder = new DefaultOutputPort<Object>()
+ {
+ public void setup(PortContext context)
+ {
+ remainderClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+
+ public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<Object>();
+
+ /**
+ * addProjectedField: Add field details (name, type, getter and setter) for field with given name
+ * in projectedFields list
+ */
+ protected void addProjectedField(String s)
+ {
+ try {
+ Field f = inClazz.getDeclaredField(s);
+ TypeInfo t = new TypeInfo(f.getName(), ClassUtils.primitiveToWrapper(f.getType()));
+ t.getter = PojoUtils.createGetter(inClazz, t.name, t.type);
+ t.setter = PojoUtils.createSetter(projectedClazz, t.name, t.type);
+ projectedFields.add(t);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("Field " + s + " not found in class " + inClazz, e);
+ }
+ }
+
+ /**
+ * addRemainderField: Add field details (name, type, getter and setter) for field with given name
+ * in remainderFields list
+ */
+ protected void addRemainderField(String s)
+ {
+ try {
+ Field f = inClazz.getDeclaredField(s);
+ TypeInfo t = new TypeInfo(f.getName(), ClassUtils.primitiveToWrapper(f.getType()));
+ t.getter = PojoUtils.createGetter(inClazz, t.name, t.type);
+ t.setter = PojoUtils.createSetter(remainderClazz, t.name, t.type);
+ remainderFields.add(t);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("Field " + s + " not found in class " + inClazz, e);
+ }
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+ final Field[] allFields = inClazz.getDeclaredFields();
+
+ if (selectFields != null && !selectFields.isEmpty()) {
+ List<String> sFields = Arrays.asList(selectFields.split(","));
+ for (String s : sFields) {
+ addProjectedField(s);
+ }
+
+ if (remainderClazz != null) {
+ for (Field f : allFields) {
+ if (!sFields.contains(f.getName())) {
+ addRemainderField(f.getName());
+ }
+ }
+ } else {
+ logger.info("Remainder Port does not have Schema class defined");
+ }
+ } else {
+ List<String> dFields = new ArrayList<>();
+ if (dropFields != null && !dropFields.isEmpty()) {
+ dFields = Arrays.asList(dropFields.split(","));
+ if (remainderClazz != null) {
+ for (String s : dFields) {
+ addRemainderField(s);
+ }
+ } else {
+ logger.info("Remainder Port does not have Schema class defined");
+ }
+ }
+
+ for (Field f : allFields) {
+ if (!dFields.contains(f.getName())) {
+ addProjectedField(f.getName());
+ }
+ }
+ }
+
+ logger.debug("projected fields: {}", projectedFields);
+ logger.debug("remainder fields: {}", remainderFields);
+ }
+
+ @Override
+ public void deactivate()
+ {
+ projectedFields.clear();
+ remainderFields.clear();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ errorTuples = projectedTuples = remainderTuples = 0;
+ }
+
+ protected Object getProjectedObject(Object t) throws IllegalAccessException
+ {
+ try {
+ Object p = projectedClazz.newInstance();
+ for (TypeInfo ti: projectedFields) {
+ ti.setter.set(p, ti.getter.get(t));
+ }
+ return p;
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw e;
+ }
+ }
+
+ protected Object getRemainderObject(Object t) throws IllegalAccessException
+ {
+ try {
+ Object r = remainderClazz.newInstance();
+ for (TypeInfo ti: remainderFields) {
+ ti.setter.set(r, ti.getter.get(t));
+ }
+ return r;
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw e;
+ }
+ }
+
+ /**
+ * handleProjection: emit projected object on projected port
+ * and remainder object on remainder port if that is connected.
+ */
+ private void handleProjection(Object t)
+ {
+ try {
+ Object p = getProjectedObject(t);
+
+ if (remainder.isConnected()) {
+ Object r = getRemainderObject(t);
+ remainder.emit(r);
+ remainderTuples++;
+ }
+
+ projected.emit(p);
+ projectedTuples++;
+ } catch (IllegalAccessException e) {
+ error.emit(t);
+ errorTuples++;
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ProjectionOperator.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/19029582/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java b/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java
new file mode 100644
index 0000000..f0b684f
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.projection;
+
+import java.util.Date;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for Projection related Activate method
+ */
+public class ActivateTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(ActivateTest.class);
+
+ public static class DummyPOJO
+ {
+ private long l;
+ private String str;
+ private Date date;
+
+ public long getL()
+ {
+ return l;
+ }
+
+ public void setL(long l)
+ {
+ this.l = l;
+ }
+
+ public String getStr()
+ {
+ return str;
+ }
+
+ public void setStr(String str)
+ {
+ this.str = str;
+ }
+
+ public Date getDate()
+ {
+ return date;
+ }
+
+ public void setDate(Date date)
+ {
+ this.date = date;
+ }
+ }
+
+ private static ProjectionOperator projection;
+ private static DummyPOJO data;
+
+ @Test
+ public void testSelectDropFieldsNull()
+ {
+ logger.debug("start round 0");
+ projection.selectFields = null;
+ projection.dropFields = null;
+ projection.activate(null);
+ Assert.assertEquals("projected fields", 3, projection.getProjectedFields().size());
+ Assert.assertEquals("remainder fields", 0, projection.getRemainderFields().size());
+ projection.deactivate();
+ logger.debug("start round 0");
+ }
+
+ @Test
+ public void testSelectDropFieldsEmpty()
+ {
+ logger.debug("start round 0");
+ projection.selectFields = "";
+ projection.dropFields = "";
+ projection.activate(null);
+ Assert.assertEquals("projected fields", 3, projection.getProjectedFields().size());
+ Assert.assertEquals("remainder fields", 0, projection.getRemainderFields().size());
+ projection.deactivate();
+ logger.debug("start round 0");
+ }
+
+ @Test
+ public void testSelectFields()
+ {
+ logger.debug("start round 0");
+ projection.selectFields = "l,str";
+ projection.dropFields = "";
+ projection.activate(null);
+ Assert.assertEquals("projected fields", 2, projection.getProjectedFields().size());
+ Assert.assertEquals("remainder fields", 1, projection.getRemainderFields().size());
+ projection.deactivate();
+ logger.debug("start round 0");
+ }
+
+ @Test
+ public void testDropFields()
+ {
+ logger.debug("start round 0");
+ projection.selectFields = "";
+ projection.dropFields = "str,date";
+ projection.activate(null);
+ Assert.assertEquals("projected fields", 1, projection.getProjectedFields().size());
+ Assert.assertEquals("remainder fields", 2, projection.getRemainderFields().size());
+ projection.deactivate();
+ logger.debug("start round 0");
+ }
+
+ @Test
+ public void testBothFieldsSpecified()
+ {
+ logger.debug("start round 0");
+ projection.selectFields = "";
+ projection.selectFields = "l,str";
+ projection.dropFields = "str,date";
+ projection.activate(null);
+ Assert.assertEquals("projected fields", 2, projection.getProjectedFields().size());
+ Assert.assertEquals("remainder fields", 1, projection.getRemainderFields().size());
+ projection.deactivate();
+ logger.debug("start round 0");
+ }
+
+ @BeforeClass
+ public static void setup()
+ {
+ data = new DummyPOJO();
+ projection = new ProjectionOperator();
+ projection.inClazz = DummyPOJO.class;
+ projection.projectedClazz = DummyPOJO.class;
+ projection.remainderClazz = DummyPOJO.class;
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ projection.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/19029582/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java b/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java
new file mode 100644
index 0000000..a47b167
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.projection;
+
+import java.lang.reflect.Field;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Tests for ProjectionOperator
+ */
+public class ProjectionTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(ProjectionTest.class);
+
+ public static class DummyPOJO
+ {
+ private long projected;
+ private long remainder;
+
+ public long getProjected()
+ {
+ return projected;
+ }
+
+ public void setProjected(long projected)
+ {
+ this.projected = projected;
+ }
+
+ public long getRemainder()
+ {
+ return remainder;
+ }
+
+ public void setRemainder(long remainder)
+ {
+ this.remainder = remainder;
+ }
+ }
+
+ public static class ProjectedPOJO
+ {
+ private long projected;
+
+ public long getProjected()
+ {
+ return projected;
+ }
+
+ public void setProjected(long projected)
+ {
+ this.projected = projected;
+ }
+ }
+
+ public static class RemainderPOJO
+ {
+ private long remainder;
+
+ public long getRemainder()
+ {
+ return remainder;
+ }
+
+ public void setRemainder(long remainder)
+ {
+ this.remainder = remainder;
+ }
+ }
+
+ private static ProjectionOperator projection;
+ private static DummyPOJO data;
+
+ public Long getFieldValue(Object p, String field)
+ {
+ Long value = new Long(0);
+
+ for (Field f: p.getClass().getDeclaredFields()) {
+ f.setAccessible(true);
+ try {
+ logger.debug("{} field: {} type: {} val: {}", field, f.getName(), f.getType(), f.get(p));
+ } catch (IllegalAccessException e) {
+ logger.info("could not access value of field: {} type: {}", f.getName(), f.getType());
+ }
+ }
+
+ try {
+ value = (Long)p.getClass().getDeclaredField(field).get(p);
+ } catch (NoSuchFieldException e) {
+ Assert.assertTrue(e instanceof NoSuchFieldException);
+ } catch (IllegalAccessException e) {
+ Assert.assertTrue(e instanceof IllegalAccessException);
+ }
+
+ return value;
+ }
+
+ public void checkProjected(Object p, Integer val)
+ {
+ Long value = ((ProjectedPOJO)p).getProjected();
+
+ Assert.assertEquals("projected field value", new Long(val), value);
+ }
+
+ public void checkRemainder(Object r, Integer val)
+ {
+ Long value = ((RemainderPOJO)r).getRemainder();
+
+ Assert.assertEquals("remainder field value", new Long(val), value);
+ }
+
+ @Test
+ public void testProjectionRemainder()
+ {
+ logger.debug("start round 0");
+ projection.beginWindow(0);
+
+ data.setProjected(1234);
+ data.setRemainder(6789);
+
+ Object p = null;
+ try {
+ p = projection.getProjectedObject(data);
+ } catch (IllegalAccessException e) {
+ Assert.assertTrue(e instanceof IllegalAccessException);
+ }
+ logger.debug("projected class {}", p.getClass());
+
+ Object r = null;
+ try {
+ r = projection.getRemainderObject(data);
+ } catch (IllegalAccessException e) {
+ Assert.assertTrue(e instanceof IllegalAccessException);
+ }
+ logger.debug("remainder class {}", r.getClass());
+
+ checkProjected(p, 1234);
+ checkRemainder(r, 6789);
+
+ projection.endWindow();
+ logger.debug("end round 0");
+ }
+
+ @Test
+ public void testProjected()
+ {
+ logger.debug("start round 0");
+ projection.beginWindow(0);
+
+ data.setProjected(2345);
+ data.setRemainder(5678);
+
+ Object p = null;
+ try {
+ p = projection.getProjectedObject(data);
+ } catch (IllegalAccessException e) {
+ Assert.assertTrue(e instanceof IllegalAccessException);
+ }
+ logger.debug("projected class {}", p.getClass());
+
+ checkProjected(p, 2345);
+
+ projection.endWindow();
+ logger.debug("end round 0");
+ }
+
+ @Test
+ public void testRemainder()
+ {
+ logger.debug("start round 0");
+ projection.beginWindow(0);
+
+ data.setProjected(9876);
+ data.setRemainder(4321);
+
+ Object r = null;
+ try {
+ r = projection.getRemainderObject(data);
+ } catch (IllegalAccessException e) {
+ Assert.assertTrue(e instanceof IllegalAccessException);
+ }
+ logger.debug("remainder class {}", r.getClass());
+
+ checkRemainder(r, 4321);
+
+ projection.endWindow();
+ logger.debug("end round 0");
+ }
+
+ @Test
+ public void testProjection()
+ {
+ logger.debug("start round 0");
+ projection.beginWindow(0);
+
+ projection.input.process(data);
+ Assert.assertEquals("projected tuples", 1, projection.projectedTuples);
+ Assert.assertEquals("remainder tuples", 0, projection.remainderTuples);
+
+ projection.endWindow();
+ logger.debug("end round 0");
+
+ CollectorTestSink projectedSink = new CollectorTestSink();
+ CollectorTestSink remainderSink = new CollectorTestSink();
+
+ projection.projected.setSink(projectedSink);
+ projection.remainder.setSink(remainderSink);
+
+ /* Collector Sink Test when remainder port is connected */
+ logger.debug("start round 1");
+ projection.beginWindow(1);
+
+ data.setProjected(4321);
+ data.setRemainder(9876);
+
+ projection.input.process(data);
+ Assert.assertEquals("projected tuples", 1, projection.projectedTuples);
+ Assert.assertEquals("remainder tuples", 1, projection.remainderTuples);
+
+ Object p = projectedSink.collectedTuples.get(0);
+ Object r = remainderSink.collectedTuples.get(0);
+
+ checkProjected(p, 4321);
+ checkRemainder(r, 9876);
+
+ projection.endWindow();
+ logger.debug("end round 1");
+ }
+
+ @BeforeClass
+ public static void setup()
+ {
+ data = new DummyPOJO();
+ projection = new ProjectionOperator();
+ projection.inClazz = DummyPOJO.class;
+ projection.projectedClazz = ProjectedPOJO.class;
+ projection.remainderClazz = RemainderPOJO.class;
+
+
+ projection.selectFields = "projected";
+ projection.activate(null);
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ projection.deactivate();
+ projection.teardown();
+ }
+}
[2/2] incubator-apex-malhar git commit: Merge branch 'master' of
https://github.com/pradeepdalvi/incubator-apex-malhar
Posted by ch...@apache.org.
Merge branch 'master' of https://github.com/pradeepdalvi/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/79eeff78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/79eeff78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/79eeff78
Branch: refs/heads/master
Commit: 79eeff782f5cba09bda633dc760cd34190b694c4
Parents: c829ada 1902958
Author: chinmaykolhatkar <ch...@datatorrent.com>
Authored: Mon Mar 28 20:57:45 2016 +0530
Committer: chinmaykolhatkar <ch...@datatorrent.com>
Committed: Mon Mar 28 20:57:45 2016 +0530
----------------------------------------------------------------------
.../lib/projection/ProjectionOperator.java | 310 +++++++++++++++++++
.../lib/projection/ActivateTest.java | 160 ++++++++++
.../lib/projection/ProjectionTest.java | 276 +++++++++++++++++
3 files changed, 746 insertions(+)
----------------------------------------------------------------------