You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/03/28 17:45:58 UTC

[1/2] incubator-apex-malhar git commit: APEXMALHAR-2015: Projection Operator & its unit tests - Projection Operator - Unit tests for select/drop fields & projection operator

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master c829adaf1 -> 79eeff782


APEXMALHAR-2015: Projection Operator & its unit tests
 - Projection Operator
 - Unit tests for select/drop fields & projection operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/19029582
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/19029582
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/19029582

Branch: refs/heads/master
Commit: 19029582be0725319597aa6f680a8a61ab7b015e
Parents: becee7f
Author: Pradeep A. Dalvi <pr...@datatorrent.com>
Authored: Wed Mar 16 17:39:17 2016 +0530
Committer: Pradeep A. Dalvi <pr...@datatorrent.com>
Committed: Wed Mar 23 14:10:45 2016 +0530

----------------------------------------------------------------------
 .../lib/projection/ProjectionOperator.java      | 310 +++++++++++++++++++
 .../lib/projection/ActivateTest.java            | 160 ++++++++++
 .../lib/projection/ProjectionTest.java          | 276 +++++++++++++++++
 3 files changed, 746 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/19029582/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java
new file mode 100644
index 0000000..8598de9
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.projection;
+
+import java.lang.reflect.Field;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * <b>ProjectionOperator</b>
+ * Projection Operator projects defined set of fields from given selectFields/dropFields
+ *
+ * <b>Parameters</b>
+ * - selectFields: comma separated list of fields to be selected from input tuples
+ * - dropFields: comma separated list of fields to be dropped from input tuples
+ * selectFields and dropFields are optional and either of them shall be specified
+ * When both are not specified, all fields shall be projected to downstream operator
+ * When both are specified, selectFields shall be given the preference
+ *
+ * <b>Input Port</b> takes POJOs as an input
+ *
+ * <b>Output Ports</b>
+ * - projected port emits POJOs with projected fields from input POJOs
+ * - remainder port, if connected, emits POJOs with remainder fields from input POJOs
+ * - error port emits input POJOs as is upon error situations
+ * 
+ * <b>Examples</b>
+ * For {a, b, c} type of input tuples
+ *  - when selectFields = "" and dropFields = "", projected port shall emit {a, b, c}
+ *  - when selectFields = "a" and dropFields = "b", projected port shall emit {a}, remainder {b, c}
+ *  - when selectFields = "b", projected port shall emit {b} and remainder port shall emit {a, c}
+ *  - when dropFields = "b", projected port shall emit {a, c} and remainder port shall emit {b}
+ *
+ */
+public class ProjectionOperator extends BaseOperator implements Operator.ActivationListener<Context>
+{
+  protected String selectFields;
+  protected String dropFields;
+  protected String condition;
+
+  static class TypeInfo
+  {
+    String name;
+    Class type;
+    PojoUtils.Setter setter;
+    PojoUtils.Getter getter;
+
+    public TypeInfo(String name, Class<?> type)
+    {
+      this.name = name;
+      this.type = type;
+    }
+
+    public String toString()
+    {
+      String s = new String("'name': " + name + " 'type': " + type);
+      return s;
+    }
+  }
+
+  private transient List<TypeInfo> projectedFields = new ArrayList<>();
+  private transient List<TypeInfo> remainderFields = new ArrayList<>();
+
+  @VisibleForTesting
+  List<TypeInfo> getProjectedFields()
+  {
+    return projectedFields;
+  }
+
+  @VisibleForTesting
+  List<TypeInfo> getRemainderFields()
+  {
+    return remainderFields;
+  }
+
+  @AutoMetric
+  protected long projectedTuples;
+
+  @AutoMetric
+  protected long remainderTuples;
+
+  @AutoMetric
+  protected long errorTuples;
+
+  protected Class<?> inClazz = null;
+  protected Class<?> projectedClazz = null;
+  protected Class<?> remainderClazz = null;
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+  {
+    public void setup(PortContext context)
+    {
+      inClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+
+    @Override
+    public void process(Object t)
+    {
+      handleProjection(t);
+    }
+  };
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort<Object> projected = new DefaultOutputPort<Object>()
+  {
+    public void setup(PortContext context)
+    {
+      projectedClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+  };
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort<Object> remainder = new DefaultOutputPort<Object>()
+  {
+    public void setup(PortContext context)
+    {
+      remainderClazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+  };
+
+
+  public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<Object>();
+
+  /**
+   * addProjectedField: Add field details (name, type, getter and setter) for field with given name
+   * in projectedFields list
+   */
+  protected void addProjectedField(String s)
+  {
+    try {
+      Field f = inClazz.getDeclaredField(s);
+      TypeInfo t = new TypeInfo(f.getName(), ClassUtils.primitiveToWrapper(f.getType()));
+      t.getter = PojoUtils.createGetter(inClazz, t.name, t.type);
+      t.setter = PojoUtils.createSetter(projectedClazz, t.name, t.type);
+      projectedFields.add(t);
+    } catch (NoSuchFieldException e) {
+      throw new RuntimeException("Field " + s + " not found in class " + inClazz, e);
+    }
+  }
+
+  /**
+   * addRemainderField: Add field details (name, type, getter and setter) for field with given name
+   * in remainderFields list
+   */
+  protected void addRemainderField(String s)
+  {
+    try {
+      Field f = inClazz.getDeclaredField(s);
+      TypeInfo t = new TypeInfo(f.getName(), ClassUtils.primitiveToWrapper(f.getType()));
+      t.getter = PojoUtils.createGetter(inClazz, t.name, t.type);
+      t.setter = PojoUtils.createSetter(remainderClazz, t.name, t.type);
+      remainderFields.add(t);
+    } catch (NoSuchFieldException e) {
+      throw new RuntimeException("Field " + s + " not found in class " + inClazz, e);
+    }
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+    final Field[] allFields = inClazz.getDeclaredFields();
+
+    if (selectFields != null && !selectFields.isEmpty()) {
+      List<String> sFields = Arrays.asList(selectFields.split(","));
+      for (String s : sFields) {
+        addProjectedField(s);
+      }
+
+      if (remainderClazz != null) {
+        for (Field f : allFields) {
+          if (!sFields.contains(f.getName())) {
+            addRemainderField(f.getName());
+          }
+        }
+      } else {
+        logger.info("Remainder Port does not have Schema class defined");
+      }
+    } else {
+      List<String> dFields = new ArrayList<>();
+      if (dropFields != null && !dropFields.isEmpty()) {
+        dFields = Arrays.asList(dropFields.split(","));
+        if (remainderClazz != null) {
+          for (String s : dFields) {
+            addRemainderField(s);
+          }
+        } else {
+          logger.info("Remainder Port does not have Schema class defined");
+        }
+      }
+
+      for (Field f : allFields) {
+        if (!dFields.contains(f.getName())) {
+          addProjectedField(f.getName());
+        }
+      }
+    }
+
+    logger.debug("projected fields: {}", projectedFields);
+    logger.debug("remainder fields: {}", remainderFields);
+  }
+
+  @Override
+  public void deactivate()
+  {
+    projectedFields.clear();
+    remainderFields.clear();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    errorTuples = projectedTuples = remainderTuples = 0;
+  }
+
+  protected Object getProjectedObject(Object t) throws IllegalAccessException
+  {
+    try {
+      Object p = projectedClazz.newInstance();
+      for (TypeInfo ti: projectedFields) {
+        ti.setter.set(p, ti.getter.get(t));
+      }
+      return p;
+    } catch (InstantiationException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw e;
+    }
+  }
+
+  protected Object getRemainderObject(Object t) throws IllegalAccessException
+  {
+    try {
+      Object r = remainderClazz.newInstance();
+      for (TypeInfo ti: remainderFields) {
+        ti.setter.set(r, ti.getter.get(t));
+      }
+      return r;
+    } catch (InstantiationException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw e;
+    }
+  }
+
+  /**
+   * handleProjection: emit projected object on projected port
+   * and remainder object on remainder port if that is connected.
+   */
+  private void handleProjection(Object t)
+  {
+    try {
+      Object p = getProjectedObject(t);
+
+      if (remainder.isConnected()) {
+        Object r = getRemainderObject(t);
+        remainder.emit(r);
+        remainderTuples++;
+      }
+
+      projected.emit(p);
+      projectedTuples++;
+    } catch (IllegalAccessException e) {
+      error.emit(t);
+      errorTuples++;
+    }
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ProjectionOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/19029582/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java b/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java
new file mode 100644
index 0000000..f0b684f
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.projection;
+
+import java.util.Date;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for Projection related Activate method
+ */
+public class ActivateTest
+{
+  private static final Logger logger = LoggerFactory.getLogger(ActivateTest.class);
+
+  public static class DummyPOJO
+  {
+    private long l;
+    private String str;
+    private Date date;
+
+    public long getL()
+    {
+      return l;
+    }
+
+    public void setL(long l)
+    {
+      this.l = l;
+    }
+
+    public String getStr()
+    {
+      return str;
+    }
+
+    public void setStr(String str)
+    {
+      this.str = str;
+    }
+
+    public Date getDate()
+    {
+      return date;
+    }
+
+    public void setDate(Date date)
+    {
+      this.date = date;
+    }
+  }
+
+  private static ProjectionOperator projection;
+  private static DummyPOJO data;
+
+  @Test
+  public void testSelectDropFieldsNull()
+  {
+    logger.debug("start round 0");
+    projection.selectFields = null;
+    projection.dropFields = null;
+    projection.activate(null);
+    Assert.assertEquals("projected fields", 3, projection.getProjectedFields().size());
+    Assert.assertEquals("remainder fields", 0, projection.getRemainderFields().size());
+    projection.deactivate();
+    logger.debug("start round 0");
+  }
+
+  @Test
+  public void testSelectDropFieldsEmpty()
+  {
+    logger.debug("start round 0");
+    projection.selectFields = "";
+    projection.dropFields = "";
+    projection.activate(null);
+    Assert.assertEquals("projected fields", 3, projection.getProjectedFields().size());
+    Assert.assertEquals("remainder fields", 0, projection.getRemainderFields().size());
+    projection.deactivate();
+    logger.debug("start round 0");
+  }
+
+  @Test
+  public void testSelectFields()
+  {
+    logger.debug("start round 0");
+    projection.selectFields = "l,str";
+    projection.dropFields = "";
+    projection.activate(null);
+    Assert.assertEquals("projected fields", 2, projection.getProjectedFields().size());
+    Assert.assertEquals("remainder fields", 1, projection.getRemainderFields().size());
+    projection.deactivate();
+    logger.debug("start round 0");
+  }
+
+  @Test
+  public void testDropFields()
+  {
+    logger.debug("start round 0");
+    projection.selectFields = "";
+    projection.dropFields = "str,date";
+    projection.activate(null);
+    Assert.assertEquals("projected fields", 1, projection.getProjectedFields().size());
+    Assert.assertEquals("remainder fields", 2, projection.getRemainderFields().size());
+    projection.deactivate();
+    logger.debug("start round 0");
+  }
+
+  @Test
+  public void testBothFieldsSpecified()
+  {
+    logger.debug("start round 0");
+    projection.selectFields = "";
+    projection.selectFields = "l,str";
+    projection.dropFields = "str,date";
+    projection.activate(null);
+    Assert.assertEquals("projected fields", 2, projection.getProjectedFields().size());
+    Assert.assertEquals("remainder fields", 1, projection.getRemainderFields().size());
+    projection.deactivate();
+    logger.debug("start round 0");
+  }
+
+  @BeforeClass
+  public static void setup()
+  {
+    data = new DummyPOJO();
+    projection = new ProjectionOperator();
+    projection.inClazz = DummyPOJO.class;
+    projection.projectedClazz = DummyPOJO.class;
+    projection.remainderClazz = DummyPOJO.class;
+  }
+
+  @AfterClass
+  public static void teardown()
+  {
+    projection.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/19029582/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java b/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java
new file mode 100644
index 0000000..a47b167
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.projection;
+
+import java.lang.reflect.Field;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Tests for ProjectionOperator
+ */
+public class ProjectionTest
+{
+  private static final Logger logger = LoggerFactory.getLogger(ProjectionTest.class);
+
+  public static class DummyPOJO
+  {
+    private long projected;
+    private long remainder;
+
+    public long getProjected()
+    {
+      return projected;
+    }
+
+    public void setProjected(long projected)
+    {
+      this.projected = projected;
+    }
+
+    public long getRemainder()
+    {
+      return remainder;
+    }
+
+    public void setRemainder(long remainder)
+    {
+      this.remainder = remainder;
+    }
+  }
+
+  public static class ProjectedPOJO
+  {
+    private long projected;
+
+    public long getProjected()
+    {
+      return projected;
+    }
+
+    public void setProjected(long projected)
+    {
+      this.projected = projected;
+    }
+  }
+
+  public static class RemainderPOJO
+  {
+    private long remainder;
+
+    public long getRemainder()
+    {
+      return remainder;
+    }
+
+    public void setRemainder(long remainder)
+    {
+      this.remainder = remainder;
+    }
+  }
+
+  private static ProjectionOperator projection;
+  private static DummyPOJO data;
+
+  public Long getFieldValue(Object p, String field)
+  {
+    Long value = new Long(0);
+
+    for (Field f: p.getClass().getDeclaredFields()) {
+      f.setAccessible(true);
+      try {
+        logger.debug("{} field: {} type: {} val: {}", field, f.getName(), f.getType(), f.get(p));
+      } catch (IllegalAccessException e) {
+        logger.info("could not access value of field: {} type: {}", f.getName(), f.getType());
+      }
+    }
+
+    try {
+      value = (Long)p.getClass().getDeclaredField(field).get(p);
+    } catch (NoSuchFieldException e) {
+      Assert.assertTrue(e instanceof NoSuchFieldException);
+    } catch (IllegalAccessException e) {
+      Assert.assertTrue(e instanceof IllegalAccessException);
+    }
+
+    return value;
+  }
+
+  public void checkProjected(Object p, Integer val)
+  {
+    Long value = ((ProjectedPOJO)p).getProjected();
+
+    Assert.assertEquals("projected field value", new Long(val), value);
+  }
+
+  public void checkRemainder(Object r, Integer val)
+  {
+    Long value = ((RemainderPOJO)r).getRemainder();
+
+    Assert.assertEquals("remainder field value", new Long(val), value);
+  }
+
+  @Test
+  public void testProjectionRemainder()
+  {
+    logger.debug("start round 0");
+    projection.beginWindow(0);
+
+    data.setProjected(1234);
+    data.setRemainder(6789);
+
+    Object p = null;
+    try {
+      p = projection.getProjectedObject(data);
+    } catch (IllegalAccessException e) {
+      Assert.assertTrue(e instanceof IllegalAccessException);
+    }
+    logger.debug("projected class {}", p.getClass());
+
+    Object r = null;
+    try {
+      r = projection.getRemainderObject(data);
+    } catch (IllegalAccessException e) {
+      Assert.assertTrue(e instanceof IllegalAccessException);
+    }
+    logger.debug("remainder class {}", r.getClass());
+
+    checkProjected(p, 1234);
+    checkRemainder(r, 6789);
+
+    projection.endWindow();
+    logger.debug("end round 0");
+  }
+
+  @Test
+  public void testProjected()
+  {
+    logger.debug("start round 0");
+    projection.beginWindow(0);
+
+    data.setProjected(2345);
+    data.setRemainder(5678);
+
+    Object p = null;
+    try {
+      p = projection.getProjectedObject(data);
+    } catch (IllegalAccessException e) {
+      Assert.assertTrue(e instanceof IllegalAccessException);
+    }
+    logger.debug("projected class {}", p.getClass());
+
+    checkProjected(p, 2345);
+
+    projection.endWindow();
+    logger.debug("end round 0");
+  }
+
+  @Test
+  public void testRemainder()
+  {
+    logger.debug("start round 0");
+    projection.beginWindow(0);
+
+    data.setProjected(9876);
+    data.setRemainder(4321);
+
+    Object r = null;
+    try {
+      r = projection.getRemainderObject(data);
+    } catch (IllegalAccessException e) {
+      Assert.assertTrue(e instanceof IllegalAccessException);
+    }
+    logger.debug("remainder class {}", r.getClass());
+
+    checkRemainder(r, 4321);
+
+    projection.endWindow();
+    logger.debug("end round 0");
+  }
+
+  @Test
+  public void testProjection()
+  {
+    logger.debug("start round 0");
+    projection.beginWindow(0);
+
+    projection.input.process(data);
+    Assert.assertEquals("projected tuples", 1, projection.projectedTuples);
+    Assert.assertEquals("remainder tuples", 0, projection.remainderTuples);
+
+    projection.endWindow();
+    logger.debug("end round 0");
+
+    CollectorTestSink projectedSink = new CollectorTestSink();
+    CollectorTestSink remainderSink = new CollectorTestSink();
+
+    projection.projected.setSink(projectedSink);
+    projection.remainder.setSink(remainderSink);
+
+    /* Collector Sink Test when remainder port is connected */
+    logger.debug("start round 1");
+    projection.beginWindow(1);
+
+    data.setProjected(4321);
+    data.setRemainder(9876);
+
+    projection.input.process(data);
+    Assert.assertEquals("projected tuples", 1, projection.projectedTuples);
+    Assert.assertEquals("remainder tuples", 1, projection.remainderTuples);
+
+    Object p = projectedSink.collectedTuples.get(0);
+    Object r = remainderSink.collectedTuples.get(0);
+
+    checkProjected(p, 4321);
+    checkRemainder(r, 9876);
+
+    projection.endWindow();
+    logger.debug("end round 1");
+  }
+
+  @BeforeClass
+  public static void setup()
+  {
+    data = new DummyPOJO();
+    projection = new ProjectionOperator();
+    projection.inClazz = DummyPOJO.class;
+    projection.projectedClazz = ProjectedPOJO.class;
+    projection.remainderClazz = RemainderPOJO.class;
+
+
+    projection.selectFields = "projected";
+    projection.activate(null);
+  }
+
+  @AfterClass
+  public static void teardown()
+  {
+    projection.deactivate();
+    projection.teardown();
+  }
+}


[2/2] incubator-apex-malhar git commit: Merge branch 'master' of https://github.com/pradeepdalvi/incubator-apex-malhar

Posted by ch...@apache.org.
Merge branch 'master' of https://github.com/pradeepdalvi/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/79eeff78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/79eeff78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/79eeff78

Branch: refs/heads/master
Commit: 79eeff782f5cba09bda633dc760cd34190b694c4
Parents: c829ada 1902958
Author: chinmaykolhatkar <ch...@datatorrent.com>
Authored: Mon Mar 28 20:57:45 2016 +0530
Committer: chinmaykolhatkar <ch...@datatorrent.com>
Committed: Mon Mar 28 20:57:45 2016 +0530

----------------------------------------------------------------------
 .../lib/projection/ProjectionOperator.java      | 310 +++++++++++++++++++
 .../lib/projection/ActivateTest.java            | 160 ++++++++++
 .../lib/projection/ProjectionTest.java          | 276 +++++++++++++++++
 3 files changed, 746 insertions(+)
----------------------------------------------------------------------