You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/12/21 02:06:30 UTC
[apex-malhar] branch master updated: APEXMALHAR-2034 Creating new
Avro Module in contrib
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push:
new 600d435 APEXMALHAR-2034 Creating new Avro Module in contrib
600d435 is described below
commit 600d435c250397c6fe8db1a4e1ced276b20ac463
Author: SaumyaMohan <sa...@gmail.com>
AuthorDate: Wed Sep 13 12:32:07 2017 -0400
APEXMALHAR-2034 Creating new Avro Module in contrib
---
.../contrib/avro/AvroFileToPojoModule.java | 91 ++++++++
.../contrib/avro/AvroFileToPojoModuleTest.java | 260 +++++++++++++++++++++
2 files changed, 351 insertions(+)
diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileToPojoModule.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileToPojoModule.java
new file mode 100644
index 0000000..8ad00df
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileToPojoModule.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+
+/**
+ * <p>
+ * Avro File To Pojo Module
+ * </p>
+ * This module emits Pojo based on the schema derived from the
+ * input file<br>
+ *
+ * Example of how to configure and add this module to DAG
+ *
+ * AvroFileToPojoModule avroFileToPojoModule = new AvroFileToPojoModule();
+ * avroFileToPojoModule.setPojoClass([className.class]);
+ * avroFileToPojoModule.setAvroFileDirectory(conf.get("[configuration property]", "[default file directory]"));
+ * avroFileToPojoModule = dag.addModule("avroFileToPojoModule", avroFileToPojoModule);
+ *
+ * No need to provide schema,its inferred from the file<br>
+ *
+ * Users can add the {@link FSWindowDataManager}
+ * to ensure exactly once semantics with a HDFS backed WAL.
+ *
+ * @displayName AvroFileToPojoModule
+ * @category Input
+ * @tags fs, file,avro, input operator, generic record, pojo
+ *
+ * @since
+ */
+public class AvroFileToPojoModule implements Module
+{
+ public final transient ProxyOutputPort<Object> output = new ProxyOutputPort<>();
+ public final transient ProxyOutputPort<GenericRecord> errorPort = new ProxyOutputPort<>();
+ //output ports from AvroFileInputOperator
+ public final transient ProxyOutputPort<String> completedAvroFilesPort = new ProxyOutputPort<>();
+ public final transient ProxyOutputPort<String> avroErrorRecordsPort = new ProxyOutputPort<>();
+
+ private AvroFileInputOperator avroFileInputOperator = new AvroFileInputOperator();
+ Class<?> pojoClass = null;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ AvroFileInputOperator avroFileInputOperator = dag.addOperator("AvroFileInputOperator", this.avroFileInputOperator);
+ AvroToPojo avroToPojo = dag.addOperator("AvroGenericObjectToPojo", new AvroToPojo());
+
+ dag.setOutputPortAttribute(avroToPojo.output, Context.PortContext.TUPLE_CLASS, pojoClass);
+
+ dag.addStream("avroFileContainerToPojo", avroFileInputOperator.output, avroToPojo.data)
+ .setLocality(DAG.Locality.CONTAINER_LOCAL);
+
+ output.set(avroToPojo.output);
+ errorPort.set(avroToPojo.errorPort);
+
+ completedAvroFilesPort.set(avroFileInputOperator.completedFilesPort);
+ avroErrorRecordsPort.set(avroFileInputOperator.errorRecordsPort);
+ }
+
+ public void setPojoClass(Class<?> pojoClass)
+ {
+ this.pojoClass = pojoClass;
+ }
+
+ public void setAvroFileDirectory(String directory)
+ {
+ avroFileInputOperator.setDirectory(directory);
+ }
+}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileToPojoModuleTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileToPojoModuleTest.java
new file mode 100644
index 0000000..6965475
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileToPojoModuleTest.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.python.google.common.collect.Lists;
+
+import org.apache.apex.engine.EmbeddedAppLauncherImpl;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.helper.TestPortContext;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+
+/**
+ * <p>
+ * In this class the emitTuples method is called twice to process the first
+ * input, since on begin window 0 the operator is setup & stream is initialized.
+ * The platform calls the emitTuples method in the successive windows
+ * </p>
+ */
+public class AvroFileToPojoModuleTest
+{
+
+ private static final String AVRO_SCHEMA = "{\"namespace\":\"abc\"," + "" + "\"type\":\"record\",\"doc\":\"Order schema\"," + "\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\"," + "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"}," + "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"string\"}]}";
+
+ private static final String FILENAME = "/tmp/simpleorder.avro";
+ private static final String OTHER_FILE = "/tmp/simpleorder2.avro";
+
+ AvroFileToPojoModule avroFileToPojoModule = new AvroFileToPojoModule();
+
+ private List<GenericRecord> recordList = null;
+
+ public static class TestMeta extends TestWatcher
+ {
+ public String dir = null;
+ OperatorContext context;
+ PortContext portContext;
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ String methodName = description.getMethodName();
+ String className = description.getClassName();
+ this.dir = "target/" + className + "/" + methodName;
+ Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(Context.DAGContext.APPLICATION_PATH, dir);
+ context = mockOperatorContext(1, attributes);
+ Attribute.AttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS, SimpleOrder.class);
+ portContext = new TestPortContext(portAttributes);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ try {
+ FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ private void createAvroInput(int cnt)
+ {
+ recordList = Lists.newArrayList();
+
+ while (cnt > 0) {
+ GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA));
+ rec.put("orderId", cnt * 1L);
+ rec.put("customerId", cnt * 2);
+ rec.put("total", cnt * 1.5);
+ rec.put("customerName", "*" + cnt + "*");
+ cnt--;
+ recordList.add(rec);
+ }
+ }
+
+ private void writeAvroFile(File outputFile)
+ {
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(new Schema.Parser().parse(AVRO_SCHEMA));
+ try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
+ dataFileWriter.create(new Schema.Parser().parse(AVRO_SCHEMA), outputFile);
+
+ for (GenericRecord record : recordList) {
+ dataFileWriter.append(record);
+ }
+ FileUtils.moveFileToDirectory(new File(outputFile.getAbsolutePath()), new File(testMeta.dir), true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testAvroToPojoModule() throws Exception
+ {
+ try {
+ FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+ int cnt = 7;
+ createAvroInput(cnt);
+ writeAvroFile(new File(FILENAME));
+ createAvroInput(cnt - 2);
+ writeAvroFile(new File(OTHER_FILE));
+
+ avroFileToPojoModule.setAvroFileDirectory(testMeta.dir);
+ avroFileToPojoModule.setPojoClass(SimpleOrder.class);
+
+ AvroToPojo avroToPojo = new AvroToPojo();
+ avroToPojo.setPojoClass(SimpleOrder.class);
+
+ EmbeddedAppLauncherImpl lma = new EmbeddedAppLauncherImpl();
+ Configuration conf = new Configuration(false);
+
+ AvroToPojoApplication avroToPojoApplication = new AvroToPojoApplication();
+ avroToPojoApplication.setAvroFileToPojoModule(avroFileToPojoModule);
+
+ lma.prepareDAG(avroToPojoApplication, conf);
+ EmbeddedAppLauncherImpl.Controller lc = lma.getController();
+ lc.run(10000);// runs for 10 seconds and quits
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+ public static class AvroToPojoApplication implements StreamingApplication
+ {
+ AvroFileToPojoModule avroFileToPojoModule;
+
+ public AvroFileToPojoModule getAvroFileToPojoModule()
+ {
+ return avroFileToPojoModule;
+ }
+
+ public void setAvroFileToPojoModule(AvroFileToPojoModule avroFileToPojoModule)
+ {
+ this.avroFileToPojoModule = avroFileToPojoModule;
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ AvroFileToPojoModule avroFileToPojoModule = dag.addModule("avroFileToPojoModule", getAvroFileToPojoModule());
+ ConsoleOutputOperator consoleOutput = dag.addOperator("console", new ConsoleOutputOperator());
+
+ dag.addStream("POJO", avroFileToPojoModule.output, consoleOutput.input);
+ }
+ }
+
+ public static class SimpleOrder
+ {
+ private Integer customerId;
+ private Long orderId;
+ private Double total;
+ private String customerName;
+
+ public SimpleOrder()
+ {
+ }
+
+ public SimpleOrder(int customerId, long orderId, double total, String customerName)
+ {
+ setCustomerId(customerId);
+ setOrderId(orderId);
+ setTotal(total);
+ setCustomerName(customerName);
+ }
+
+ public String getCustomerName()
+ {
+ return customerName;
+ }
+
+ public void setCustomerName(String customerName)
+ {
+ this.customerName = customerName;
+ }
+
+ public Integer getCustomerId()
+ {
+ return customerId;
+ }
+
+ public void setCustomerId(Integer customerId)
+ {
+ this.customerId = customerId;
+ }
+
+ public Long getOrderId()
+ {
+ return orderId;
+ }
+
+ public void setOrderId(Long orderId)
+ {
+ this.orderId = orderId;
+ }
+
+ public Double getTotal()
+ {
+ return total;
+ }
+
+ public void setTotal(Double total)
+ {
+ this.total = total;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SimpleOrder [customerId=" + customerId + ", orderId=" + orderId + ", total=" + total + ", customerName=" + customerName + "]";
+ }
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].