You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/12/21 02:06:30 UTC

[apex-malhar] branch master updated: APEXMALHAR-2034 Creating new Avro Module in contrib

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git


The following commit(s) were added to refs/heads/master by this push:
     new 600d435  APEXMALHAR-2034 Creating new Avro Module in contrib
600d435 is described below

commit 600d435c250397c6fe8db1a4e1ced276b20ac463
Author: SaumyaMohan <sa...@gmail.com>
AuthorDate: Wed Sep 13 12:32:07 2017 -0400

    APEXMALHAR-2034 Creating new Avro Module in contrib
---
 .../contrib/avro/AvroFileToPojoModule.java         |  91 ++++++++
 .../contrib/avro/AvroFileToPojoModuleTest.java     | 260 +++++++++++++++++++++
 2 files changed, 351 insertions(+)

diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileToPojoModule.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileToPojoModule.java
new file mode 100644
index 0000000..8ad00df
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileToPojoModule.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+
+/**
+ * <p>
+ * Avro File To Pojo Module
+ * </p>
+ * This module emits Pojo based on the schema derived from the
+ * input file<br>
+ *
+ * Example of how to configure and add this module to DAG
+ *
+ * AvroFileToPojoModule avroFileToPojoModule = new AvroFileToPojoModule();
+ * avroFileToPojoModule.setPojoClass([className.class]);
+ * avroFileToPojoModule.setAvroFileDirectory(conf.get("[configuration property]", "[default file directory]"));
+ * avroFileToPojoModule = dag.addModule("avroFileToPojoModule", avroFileToPojoModule);
+ *
+ * No need to provide schema,its inferred from the file<br>
+ *
+ * Users can add the {@link FSWindowDataManager}
+ * to ensure exactly once semantics with a HDFS backed WAL.
+ *
+ * @displayName AvroFileToPojoModule
+ * @category Input
+ * @tags fs, file,avro, input operator, generic record, pojo
+ *
+ * @since
+ */
+public class AvroFileToPojoModule implements Module
+{
+  public final transient ProxyOutputPort<Object> output = new ProxyOutputPort<>();
+  public final transient ProxyOutputPort<GenericRecord> errorPort = new ProxyOutputPort<>();
+  //output ports from AvroFileInputOperator
+  public final transient ProxyOutputPort<String> completedAvroFilesPort = new ProxyOutputPort<>();
+  public final transient ProxyOutputPort<String> avroErrorRecordsPort = new ProxyOutputPort<>();
+
+  private AvroFileInputOperator avroFileInputOperator = new AvroFileInputOperator();
+  Class<?> pojoClass = null;
+
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    AvroFileInputOperator avroFileInputOperator = dag.addOperator("AvroFileInputOperator", this.avroFileInputOperator);
+    AvroToPojo avroToPojo = dag.addOperator("AvroGenericObjectToPojo", new AvroToPojo());
+
+    dag.setOutputPortAttribute(avroToPojo.output, Context.PortContext.TUPLE_CLASS, pojoClass);
+
+    dag.addStream("avroFileContainerToPojo", avroFileInputOperator.output, avroToPojo.data)
+        .setLocality(DAG.Locality.CONTAINER_LOCAL);
+
+    output.set(avroToPojo.output);
+    errorPort.set(avroToPojo.errorPort);
+
+    completedAvroFilesPort.set(avroFileInputOperator.completedFilesPort);
+    avroErrorRecordsPort.set(avroFileInputOperator.errorRecordsPort);
+  }
+
+  public void setPojoClass(Class<?> pojoClass)
+  {
+    this.pojoClass = pojoClass;
+  }
+
+  public void setAvroFileDirectory(String directory)
+  {
+    avroFileInputOperator.setDirectory(directory);
+  }
+}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileToPojoModuleTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileToPojoModuleTest.java
new file mode 100644
index 0000000..6965475
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileToPojoModuleTest.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.python.google.common.collect.Lists;
+
+import org.apache.apex.engine.EmbeddedAppLauncherImpl;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.helper.TestPortContext;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+
+/**
+ * <p>
+ * In this class the emitTuples method is called twice to process the first
+ * input, since on begin window 0 the operator is setup & stream is initialized.
+ * The platform calls the emitTuples method in the successive windows
+ * </p>
+ */
+public class AvroFileToPojoModuleTest
+{
+
+  private static final String AVRO_SCHEMA = "{\"namespace\":\"abc\"," + "" + "\"type\":\"record\",\"doc\":\"Order schema\"," + "\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\"," + "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"}," + "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"string\"}]}";
+
+  private static final String FILENAME = "/tmp/simpleorder.avro";
+  private static final String OTHER_FILE = "/tmp/simpleorder2.avro";
+
+  AvroFileToPojoModule avroFileToPojoModule = new AvroFileToPojoModule();
+
+  private List<GenericRecord> recordList = null;
+
+  public static class TestMeta extends TestWatcher
+  {
+    public String dir = null;
+    OperatorContext context;
+    PortContext portContext;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      String methodName = description.getMethodName();
+      String className = description.getClassName();
+      this.dir = "target/" + className + "/" + methodName;
+      Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(Context.DAGContext.APPLICATION_PATH, dir);
+      context = mockOperatorContext(1, attributes);
+      Attribute.AttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      portAttributes.put(Context.PortContext.TUPLE_CLASS, SimpleOrder.class);
+      portContext = new TestPortContext(portAttributes);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      try {
+        FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  private void createAvroInput(int cnt)
+  {
+    recordList = Lists.newArrayList();
+
+    while (cnt > 0) {
+      GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA));
+      rec.put("orderId", cnt * 1L);
+      rec.put("customerId", cnt * 2);
+      rec.put("total", cnt * 1.5);
+      rec.put("customerName", "*" + cnt + "*");
+      cnt--;
+      recordList.add(rec);
+    }
+  }
+
+  private void writeAvroFile(File outputFile)
+  {
+    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(new Schema.Parser().parse(AVRO_SCHEMA));
+    try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
+      dataFileWriter.create(new Schema.Parser().parse(AVRO_SCHEMA), outputFile);
+
+      for (GenericRecord record : recordList) {
+        dataFileWriter.append(record);
+      }
+      FileUtils.moveFileToDirectory(new File(outputFile.getAbsolutePath()), new File(testMeta.dir), true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testAvroToPojoModule() throws Exception
+  {
+    try {
+      FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+      int cnt = 7;
+      createAvroInput(cnt);
+      writeAvroFile(new File(FILENAME));
+      createAvroInput(cnt - 2);
+      writeAvroFile(new File(OTHER_FILE));
+
+      avroFileToPojoModule.setAvroFileDirectory(testMeta.dir);
+      avroFileToPojoModule.setPojoClass(SimpleOrder.class);
+
+      AvroToPojo avroToPojo = new AvroToPojo();
+      avroToPojo.setPojoClass(SimpleOrder.class);
+
+      EmbeddedAppLauncherImpl lma = new EmbeddedAppLauncherImpl();
+      Configuration conf = new Configuration(false);
+
+      AvroToPojoApplication avroToPojoApplication = new AvroToPojoApplication();
+      avroToPojoApplication.setAvroFileToPojoModule(avroFileToPojoModule);
+
+      lma.prepareDAG(avroToPojoApplication, conf);
+      EmbeddedAppLauncherImpl.Controller lc = lma.getController();
+      lc.run(10000);// runs for 10 seconds and quits
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  public static class AvroToPojoApplication implements StreamingApplication
+  {
+    AvroFileToPojoModule avroFileToPojoModule;
+
+    public AvroFileToPojoModule getAvroFileToPojoModule()
+    {
+      return avroFileToPojoModule;
+    }
+
+    public void setAvroFileToPojoModule(AvroFileToPojoModule avroFileToPojoModule)
+    {
+      this.avroFileToPojoModule = avroFileToPojoModule;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      AvroFileToPojoModule avroFileToPojoModule = dag.addModule("avroFileToPojoModule", getAvroFileToPojoModule());
+      ConsoleOutputOperator consoleOutput = dag.addOperator("console", new ConsoleOutputOperator());
+
+      dag.addStream("POJO", avroFileToPojoModule.output, consoleOutput.input);
+    }
+  }
+
+  public static class SimpleOrder
+  {
+    private Integer customerId;
+    private Long orderId;
+    private Double total;
+    private String customerName;
+
+    public SimpleOrder()
+    {
+    }
+
+    public SimpleOrder(int customerId, long orderId, double total, String customerName)
+    {
+      setCustomerId(customerId);
+      setOrderId(orderId);
+      setTotal(total);
+      setCustomerName(customerName);
+    }
+
+    public String getCustomerName()
+    {
+      return customerName;
+    }
+
+    public void setCustomerName(String customerName)
+    {
+      this.customerName = customerName;
+    }
+
+    public Integer getCustomerId()
+    {
+      return customerId;
+    }
+
+    public void setCustomerId(Integer customerId)
+    {
+      this.customerId = customerId;
+    }
+
+    public Long getOrderId()
+    {
+      return orderId;
+    }
+
+    public void setOrderId(Long orderId)
+    {
+      this.orderId = orderId;
+    }
+
+    public Double getTotal()
+    {
+      return total;
+    }
+
+    public void setTotal(Double total)
+    {
+      this.total = total;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "SimpleOrder [customerId=" + customerId + ", orderId=" + orderId + ", total=" + total + ", customerName=" + customerName + "]";
+    }
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].