You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by il...@apache.org on 2017/05/05 22:09:00 UTC
apex-malhar git commit: APEXMALHAR-2484 Support of PartFileWriter for
writing the part files
Repository: apex-malhar
Updated Branches:
refs/heads/master b5c003c94 -> 09a65c2f9
APEXMALHAR-2484 Support of PartFileWriter for writing the part files
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/09a65c2f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/09a65c2f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/09a65c2f
Branch: refs/heads/master
Commit: 09a65c2f9f9743cca79bb0b086bab89ca827401e
Parents: b5c003c
Author: chaitanya <ch...@apache.org>
Authored: Wed May 3 14:54:03 2017 +0530
Committer: Ilya Ganelin <il...@apache.org>
Committed: Fri May 5 15:08:35 2017 -0700
----------------------------------------------------------------------
.../malhar/lib/io/block/PartFileWriter.java | 146 ++++++++++++++++
.../malhar/lib/io/block/PartFileWriterTest.java | 171 +++++++++++++++++++
2 files changed, 317 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/09a65c2f/library/src/main/java/org/apache/apex/malhar/lib/io/block/PartFileWriter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/io/block/PartFileWriter.java b/library/src/main/java/org/apache/apex/malhar/lib/io/block/PartFileWriter.java
new file mode 100644
index 0000000..010e8ff
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/io/block/PartFileWriter.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.io.block;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.io.block.AbstractBlockReader;
+import com.datatorrent.lib.io.block.BlockWriter;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Writes the blocks into the specified directory.
+ * If f1 is the file of size 10 MB and the block size is 1 MB then this operator writes the blocks into the
+ * specified directory as f1.part1, f1.part2 , ...., f1.part10. Here, size of each part is 1 MB.
+ */
+public class PartFileWriter extends BlockWriter implements Operator.IdleTimeHandler
+{
+ protected static String PARTSUFFIX = ".part";
+ @NotNull
+ private String outputDirectoryPath;
+ private Map<Long, MutablePair<Integer, String>> blockInfo = new HashMap<>();
+ private transient List<AbstractBlockReader.ReaderRecord<Slice>> waitingTuples;
+
+ public final transient DefaultInputPort<AbstractFileSplitter.FileMetadata> fileMetadataInput = new DefaultInputPort<AbstractFileSplitter.FileMetadata>()
+ {
+ @Override
+ public void process(AbstractFileSplitter.FileMetadata fileMetadata)
+ {
+ blockInfo.clear();
+ long[] blocks = fileMetadata.getBlockIds();
+ String relativePath = fileMetadata.getRelativePath();
+ for (int i = 0; i < blocks.length; i++) {
+ blockInfo.put(blocks[i], new MutablePair<>(i + 1, relativePath));
+ }
+ }
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ filePath = outputDirectoryPath;
+ waitingTuples = new LinkedList<>();
+ }
+
+ @Override
+ protected void processTuple(AbstractBlockReader.ReaderRecord<Slice> tuple)
+ {
+ // Check whether the fileMetadata of this blockid is received from fileMetadataInput port. If not, put it in waitingTuples.
+ if (blockInfo.get(tuple.getBlockId()) == null) {
+ waitingTuples.add(tuple);
+ return;
+ }
+ super.processTuple(tuple);
+ }
+
+ @Override
+ protected String getFileName(AbstractBlockReader.ReaderRecord<Slice> tuple)
+ {
+ MutablePair<Integer,String> blockId = blockInfo.get(tuple.getBlockId());
+ return blockId.getRight() + PARTSUFFIX + blockId.getLeft();
+ }
+
+ @Override
+ public void endWindow()
+ {
+ processWaitBlocks();
+ waitingTuples.clear();
+ super.endWindow();
+ }
+
+ @Override
+ public void finalizeFile(String fileName) throws IOException
+ {
+ MutablePair<Integer,String> blockId = blockInfo.get(Long.parseLong(fileName));
+ super.finalizeFile(blockId.getRight() + PARTSUFFIX + blockId.getLeft());
+ }
+
+ @Override
+ public void handleIdleTime()
+ {
+ processWaitBlocks();
+ }
+
+ /**
+ * Process the blocks which are in wait state.
+ */
+ protected void processWaitBlocks()
+ {
+ Iterator<AbstractBlockReader.ReaderRecord<Slice>> waitIterator = waitingTuples.iterator();
+ while (waitIterator.hasNext()) {
+ AbstractBlockReader.ReaderRecord<Slice> blockData = waitIterator.next();
+ if (blockInfo.get(blockData.getBlockId()) != null) {
+ super.processTuple(blockData);
+ waitIterator.remove();
+ }
+ }
+ }
+
+ /**
+ * Return the path of output directory for storing part files
+ * @return outputDirectoryPath
+ */
+ public String getOutputDirectoryPath()
+ {
+ return outputDirectoryPath;
+ }
+
+ /**
+ * Specify the path of output directory for storing part files
+ * @param outputDirectoryPath given outputDirectoryPath
+ */
+ public void setOutputDirectoryPath(String outputDirectoryPath)
+ {
+ this.outputDirectoryPath = outputDirectoryPath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/09a65c2f/library/src/test/java/org/apache/apex/malhar/lib/io/block/PartFileWriterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/io/block/PartFileWriterTest.java b/library/src/test/java/org/apache/apex/malhar/lib/io/block/PartFileWriterTest.java
new file mode 100644
index 0000000..784ab39
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/io/block/PartFileWriterTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.io.block;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.io.block.AbstractBlockReader;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter;
+import com.datatorrent.netlet.util.Slice;
+
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+
+public class PartFileWriterTest
+{
+
+ public static final int BLOCK_SIZE = 5;
+ public static String FILE_NAME = "FILE";
+
+ public static final String[] FILE_CONTENTS = {"abcdefgh", "pqrst", "xyz", "ABCDEFGHIJKLMNOPQRSTUVWXYZ",
+ "0123456789" };
+
+ private class TestMeta extends TestWatcher
+ {
+ String outputPath;
+ //Maintain the sequence of records and process one by one through the input port of PartFileWriter.
+ List<List<AbstractBlockReader.ReaderRecord<Slice>>> blockDataList = new ArrayList<>(FILE_CONTENTS.length);
+ Map<Long, String> blockIdToExpectedContent = Maps.newHashMap();
+
+ PartFileWriter underTest;
+ File blocksDir;
+ Context.OperatorContext context;
+ List<AbstractFileSplitter.FileMetadata> fileMetadatas = new ArrayList<>();
+
+ /* (non-Javadoc)
+ * @see org.junit.rules.TestWatcher#starting(org.junit.runner.Description)
+ */
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ outputPath = new File("target/" + description.getClassName() + "/" + description.getMethodName()).getPath();
+
+ underTest = new PartFileWriter();
+ underTest.setOutputDirectoryPath(outputPath);
+
+ try {
+ File outDir = new File(outputPath);
+ FileUtils.forceMkdir(outDir);
+
+ blocksDir = new File(outputPath);
+ blocksDir.mkdirs();
+
+ populateBlocks();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void populateBlocks()
+ {
+ long blockId = 1000;
+ for (int i = 0; i < FILE_CONTENTS.length; i++) {
+ int blockIndex = 0;
+ List<AbstractBlockReader.ReaderRecord<Slice>> blockList = new ArrayList<>();
+ AbstractFileSplitter.FileMetadata fileMetadata = new AbstractFileSplitter.FileMetadata(outputPath);
+ fileMetadata.setRelativePath(FILE_NAME + i);
+ String fileContents = FILE_CONTENTS[i];
+ int fileLength = fileContents.length();
+ int noOfBlocks = ((fileLength / BLOCK_SIZE) + (((fileLength % BLOCK_SIZE) == 0) ? 0 : 1));
+ long[] blockIds = new long[noOfBlocks];
+ for (int offset = 0; offset < fileLength; offset += BLOCK_SIZE, blockId++) {
+ String blockContents;
+ if (offset + BLOCK_SIZE < fileLength) {
+ blockContents = fileContents.substring(offset, offset + BLOCK_SIZE);
+ } else {
+ blockContents = fileContents.substring(offset);
+ }
+
+ AbstractBlockReader.ReaderRecord<Slice> readerRecord = new AbstractBlockReader.ReaderRecord<Slice>(blockId, new Slice(blockContents.getBytes()));
+ blockIds[blockIndex] = blockId;
+ blockIndex++;
+ blockIdToExpectedContent.put(blockId, blockContents);
+ blockList.add(readerRecord);
+ }
+ blockDataList.add(blockList);
+ fileMetadata.setBlockIds(blockIds);
+ fileMetadata.setNumberOfBlocks(noOfBlocks);
+ fileMetadatas.add(fileMetadata);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.junit.rules.TestWatcher#finished(org.junit.runner.Description)
+ */
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+
+ try {
+ FileUtils.deleteDirectory(new File(outputPath));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Rule
+ public PartFileWriterTest.TestMeta testMeta = new PartFileWriterTest.TestMeta();
+
+ @Test
+ public void testBlockWriting() throws IOException
+ {
+ Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.DAGContext.APPLICATION_ID, "PartitionWriterTest");
+ attributes.put(DAG.DAGContext.APPLICATION_PATH, testMeta.outputPath);
+ testMeta.context = mockOperatorContext(1, attributes);
+
+ testMeta.underTest.setup(testMeta.context);
+ for (int fileIndex = 0; fileIndex < FILE_CONTENTS.length; fileIndex++) {
+ testMeta.underTest.beginWindow(fileIndex);
+ testMeta.underTest.fileMetadataInput.process(testMeta.fileMetadatas.get(fileIndex));
+ for (int blockIndex = 0; blockIndex < testMeta.fileMetadatas.get(fileIndex).getNumberOfBlocks(); blockIndex++) {
+ testMeta.underTest.input.process(testMeta.blockDataList.get(fileIndex).get(blockIndex));
+ }
+ testMeta.underTest.endWindow();
+ }
+ testMeta.underTest.committed(2);
+ testMeta.underTest.teardown();
+ File[] blockFileNames = testMeta.blocksDir.listFiles();
+ for (File blockFile : blockFileNames) {
+ int fileIndex = Integer.parseInt(blockFile.getName().split("\\.")[0].replaceAll(FILE_NAME, ""));
+ //Ignore the PartFileWriter.PARTSUFFIX and trailing "."
+ int blockIndex = Integer.parseInt(blockFile.getName().split("\\.")[1].replaceAll(PartFileWriter.PARTSUFFIX.substring(1), ""));
+ String expected = testMeta.blockIdToExpectedContent.get(testMeta.fileMetadatas.get(fileIndex).getBlockIds()[blockIndex - 1]);
+ Assert.assertEquals(expected, FileUtils.readFileToString(blockFile));
+ }
+ }
+}