You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/08 11:07:12 UTC
[apex-malhar] 03/05: Atomic file output app
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
commit ddbf26f00f5fa04774792294ccab7599667f1b77
Author: Chandni Singh <cs...@apache.org>
AuthorDate: Wed Mar 2 18:27:15 2016 -0800
Atomic file output app
---
.../com/example/myapexapp/AtomicFileOutputApp.java | 101 +++++++++++++++++++++
.../example/myapexapp/AtomicFileOutputAppTest.java | 93 +++++++++++++++++++
2 files changed, 194 insertions(+)
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java b/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
new file mode 100644
index 0000000..55f8cc6
--- /dev/null
+++ b/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
@@ -0,0 +1,101 @@
+/**
+ * Copyright (c) 2015 DataTorrent, Inc.
+ * All rights reserved.
+ */
+package com.example.myapexapp;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+@ApplicationAnnotation(name = "AtomicFileOutput")
+public class AtomicFileOutputApp implements StreamingApplication
+{
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput",
+ new KafkaSinglePortStringInputOperator());
+ kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+
+ Application.UniqueCounterFlat count = dag.addOperator("count", new Application.UniqueCounterFlat());
+
+ FileWriter fileWriter = dag.addOperator("fileWriter", new FileWriter());
+
+ ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
+ dag.addStream("words", kafkaInput.outputPort, count.data);
+ dag.addStream("counts", count.counts, fileWriter.input, cons.input);
+ }
+
+ /**
+ * This implementation of {@link AbstractFileOutputOperator} writes to a single file. However when it doesn't
+ * receive any tuples in an application window then it finalizes the file, i.e., the file is completed and will not
+ * be opened again.
+ * <p/>
+ * If more tuples are received after a hiatus then they will be written to a part file -
+ * {@link #FILE_NAME_PREFIX}.{@link #part}
+ */
+ public static class FileWriter extends AbstractFileOutputOperator<KeyValPair<String, Integer>>
+ {
+ static final String FILE_NAME_PREFIX = "filestore";
+
+ private int part;
+ private transient String currentFileName;
+
+ private transient boolean receivedTuples;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ currentFileName = (part == 0) ? FILE_NAME_PREFIX : FILE_NAME_PREFIX + "." + part;
+ super.setup(context);
+ }
+
+ @Override
+ protected String getFileName(KeyValPair<String, Integer> keyValPair)
+ {
+ return currentFileName;
+ }
+
+ @Override
+ protected byte[] getBytesForTuple(KeyValPair<String, Integer> keyValPair)
+ {
+ return (keyValPair.toString() + "\n").getBytes();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ receivedTuples = false;
+ }
+
+ @Override
+ protected void processTuple(KeyValPair<String, Integer> tuple)
+ {
+ super.processTuple(tuple);
+ receivedTuples = true;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ //request for finalization if there is no input. This is done automatically if the file is rotated periodically
+ // or has a size threshold.
+ if (!receivedTuples && !endOffsets.isEmpty()) {
+ requestFinalize(currentFileName);
+ part++;
+ currentFileName = FILE_NAME_PREFIX + "." + part;
+ }
+ }
+ }
+}
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
new file mode 100644
index 0000000..b539394
--- /dev/null
+++ b/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
@@ -0,0 +1,93 @@
+package com.example.myapexapp;
+
+import java.io.File;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
+import com.datatorrent.contrib.kafka.KafkaTestProducer;
+
+/**
+ * Copyright (c) 2015 DataTorrent, Inc.
+ * All rights reserved.
+ */
+public class AtomicFileOutputAppTest
+{
+ private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+ private static final String KAFKA_TOPIC = "exactly-once-test";
+ private static final String TARGET_DIR = "target/atomicFileOutput";
+
+ @Before
+ public void beforeTest() throws Exception {
+ kafkaLauncher.baseDir = "target/" + this.getClass().getName();
+ FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
+ kafkaLauncher.startZookeeper();
+ kafkaLauncher.startKafkaServer();
+ kafkaLauncher.createTopic(0, KAFKA_TOPIC);
+ }
+
+ @After
+ public void afterTest() {
+ kafkaLauncher.stopKafkaServer();
+ kafkaLauncher.stopZookeeper();
+ }
+
+ @Test
+ public void testApplication() throws Exception {
+ try {
+
+ File targetDir = new File(TARGET_DIR);
+ FileUtils.deleteDirectory(targetDir);
+ FileUtils.forceMkdir(targetDir);
+
+ // produce some test data
+ KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
+ String[] words = "count the words from kafka and store them in the db".split("\\s+");
+ p.setMessages(Lists.newArrayList(words));
+ new Thread(p).start();
+
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
+ conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+ conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
+ conf.set("dt.operator.fileWriter.prop.filePath", TARGET_DIR);
+
+ lma.prepareDAG(new AtomicFileOutputApp(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync(); // test will terminate after results are available
+
+ long timeout = System.currentTimeMillis() + 60000; // 60s timeout
+
+ File outputFile = new File(TARGET_DIR, AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX);
+ while (!outputFile.exists() && timeout > System.currentTimeMillis()) {
+ Thread.sleep(1000);
+ LOG.debug("Waiting for {}", outputFile);
+ }
+
+ Assert.assertTrue("output file exists " + AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX, outputFile.exists() &&
+ outputFile.isFile());
+
+ lc.shutdown();
+
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <co...@apex.apache.org>.