You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by il...@apache.org on 2017/05/05 22:09:00 UTC

apex-malhar git commit: APEXMALHAR-2484 Support of PartFileWriter for writing the part files

Repository: apex-malhar
Updated Branches:
  refs/heads/master b5c003c94 -> 09a65c2f9


APEXMALHAR-2484 Support of PartFileWriter for writing the part files


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/09a65c2f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/09a65c2f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/09a65c2f

Branch: refs/heads/master
Commit: 09a65c2f9f9743cca79bb0b086bab89ca827401e
Parents: b5c003c
Author: chaitanya <ch...@apache.org>
Authored: Wed May 3 14:54:03 2017 +0530
Committer: Ilya Ganelin <il...@apache.org>
Committed: Fri May 5 15:08:35 2017 -0700

----------------------------------------------------------------------
 .../malhar/lib/io/block/PartFileWriter.java     | 146 ++++++++++++++++
 .../malhar/lib/io/block/PartFileWriterTest.java | 171 +++++++++++++++++++
 2 files changed, 317 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/09a65c2f/library/src/main/java/org/apache/apex/malhar/lib/io/block/PartFileWriter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/io/block/PartFileWriter.java b/library/src/main/java/org/apache/apex/malhar/lib/io/block/PartFileWriter.java
new file mode 100644
index 0000000..010e8ff
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/io/block/PartFileWriter.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.io.block;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.io.block.AbstractBlockReader;
+import com.datatorrent.lib.io.block.BlockWriter;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Writes the blocks into the specified directory.
+ * If f1 is the file of size 10 MB and the block size is 1 MB then this operator writes the blocks into the
+ * specified directory as f1.part1, f1.part2 , ...., f1.part10. Here, size of each part is 1 MB.
+ */
+public class PartFileWriter extends BlockWriter implements Operator.IdleTimeHandler
+{
+  protected static String PARTSUFFIX = ".part";
+  @NotNull
+  private String outputDirectoryPath;
+  private Map<Long, MutablePair<Integer, String>> blockInfo = new HashMap<>();
+  private transient List<AbstractBlockReader.ReaderRecord<Slice>> waitingTuples;
+
+  public final transient DefaultInputPort<AbstractFileSplitter.FileMetadata> fileMetadataInput = new DefaultInputPort<AbstractFileSplitter.FileMetadata>()
+  {
+    @Override
+    public void process(AbstractFileSplitter.FileMetadata fileMetadata)
+    {
+      blockInfo.clear();
+      long[] blocks = fileMetadata.getBlockIds();
+      String relativePath = fileMetadata.getRelativePath();
+      for (int i = 0; i < blocks.length; i++) {
+        blockInfo.put(blocks[i], new MutablePair<>(i + 1, relativePath));
+      }
+    }
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    filePath = outputDirectoryPath;
+    waitingTuples = new LinkedList<>();
+  }
+
+  @Override
+  protected void processTuple(AbstractBlockReader.ReaderRecord<Slice> tuple)
+  {
+    // Check whether the fileMetadata of this blockid is received from fileMetadataInput port. If not, put it in waitingTuples.
+    if (blockInfo.get(tuple.getBlockId()) == null) {
+      waitingTuples.add(tuple);
+      return;
+    }
+    super.processTuple(tuple);
+  }
+
+  @Override
+  protected String getFileName(AbstractBlockReader.ReaderRecord<Slice> tuple)
+  {
+    MutablePair<Integer,String> blockId = blockInfo.get(tuple.getBlockId());
+    return blockId.getRight() + PARTSUFFIX + blockId.getLeft();
+  }
+
+  @Override
+  public void endWindow()
+  {
+    processWaitBlocks();
+    waitingTuples.clear();
+    super.endWindow();
+  }
+
+  @Override
+  public void finalizeFile(String fileName) throws IOException
+  {
+    MutablePair<Integer,String> blockId = blockInfo.get(Long.parseLong(fileName));
+    super.finalizeFile(blockId.getRight() + PARTSUFFIX + blockId.getLeft());
+  }
+
+  @Override
+  public void handleIdleTime()
+  {
+    processWaitBlocks();
+  }
+
+  /**
+   * Process the blocks which are in wait state.
+   */
+  protected void processWaitBlocks()
+  {
+    Iterator<AbstractBlockReader.ReaderRecord<Slice>> waitIterator = waitingTuples.iterator();
+    while (waitIterator.hasNext()) {
+      AbstractBlockReader.ReaderRecord<Slice> blockData = waitIterator.next();
+      if (blockInfo.get(blockData.getBlockId()) != null) {
+        super.processTuple(blockData);
+        waitIterator.remove();
+      }
+    }
+  }
+
+  /**
+   * Return the path of output directory for storing part files
+   * @return outputDirectoryPath
+   */
+  public String getOutputDirectoryPath()
+  {
+    return outputDirectoryPath;
+  }
+
+  /**
+   * Specify the path of output directory for storing part files
+   * @param outputDirectoryPath given outputDirectoryPath
+   */
+  public void setOutputDirectoryPath(String outputDirectoryPath)
+  {
+    this.outputDirectoryPath = outputDirectoryPath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/09a65c2f/library/src/test/java/org/apache/apex/malhar/lib/io/block/PartFileWriterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/io/block/PartFileWriterTest.java b/library/src/test/java/org/apache/apex/malhar/lib/io/block/PartFileWriterTest.java
new file mode 100644
index 0000000..784ab39
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/io/block/PartFileWriterTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.io.block;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.io.block.AbstractBlockReader;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter;
+import com.datatorrent.netlet.util.Slice;
+
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+
+public class PartFileWriterTest
+{
+
+  public static final int BLOCK_SIZE = 5;
+  public static String FILE_NAME = "FILE";
+
+  public static final String[] FILE_CONTENTS = {"abcdefgh", "pqrst", "xyz", "ABCDEFGHIJKLMNOPQRSTUVWXYZ",
+    "0123456789" };
+
+  private class TestMeta extends TestWatcher
+  {
+    String outputPath;
+    //Maintain the sequence of records and process one by one through the input port of PartFileWriter.
+    List<List<AbstractBlockReader.ReaderRecord<Slice>>> blockDataList = new ArrayList<>(FILE_CONTENTS.length);
+    Map<Long, String> blockIdToExpectedContent = Maps.newHashMap();
+
+    PartFileWriter underTest;
+    File blocksDir;
+    Context.OperatorContext context;
+    List<AbstractFileSplitter.FileMetadata> fileMetadatas = new ArrayList<>();
+
+    /* (non-Javadoc)
+     * @see org.junit.rules.TestWatcher#starting(org.junit.runner.Description)
+     */
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      outputPath = new File("target/" + description.getClassName() + "/" + description.getMethodName()).getPath();
+
+      underTest = new PartFileWriter();
+      underTest.setOutputDirectoryPath(outputPath);
+
+      try {
+        File outDir = new File(outputPath);
+        FileUtils.forceMkdir(outDir);
+
+        blocksDir = new File(outputPath);
+        blocksDir.mkdirs();
+
+        populateBlocks();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    protected void populateBlocks()
+    {
+      long blockId = 1000;
+      for (int i = 0; i < FILE_CONTENTS.length; i++) {
+        int blockIndex = 0;
+        List<AbstractBlockReader.ReaderRecord<Slice>> blockList = new ArrayList<>();
+        AbstractFileSplitter.FileMetadata fileMetadata = new AbstractFileSplitter.FileMetadata(outputPath);
+        fileMetadata.setRelativePath(FILE_NAME + i);
+        String fileContents = FILE_CONTENTS[i];
+        int fileLength = fileContents.length();
+        int noOfBlocks = ((fileLength / BLOCK_SIZE) + (((fileLength % BLOCK_SIZE) == 0) ? 0 : 1));
+        long[] blockIds = new long[noOfBlocks];
+        for (int offset = 0; offset < fileLength; offset += BLOCK_SIZE, blockId++) {
+          String blockContents;
+          if (offset + BLOCK_SIZE < fileLength) {
+            blockContents = fileContents.substring(offset, offset + BLOCK_SIZE);
+          } else {
+            blockContents = fileContents.substring(offset);
+          }
+
+          AbstractBlockReader.ReaderRecord<Slice> readerRecord = new AbstractBlockReader.ReaderRecord<Slice>(blockId, new Slice(blockContents.getBytes()));
+          blockIds[blockIndex] = blockId;
+          blockIndex++;
+          blockIdToExpectedContent.put(blockId, blockContents);
+          blockList.add(readerRecord);
+        }
+        blockDataList.add(blockList);
+        fileMetadata.setBlockIds(blockIds);
+        fileMetadata.setNumberOfBlocks(noOfBlocks);
+        fileMetadatas.add(fileMetadata);
+      }
+    }
+
+    /* (non-Javadoc)
+     * @see org.junit.rules.TestWatcher#finished(org.junit.runner.Description)
+     */
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+
+      try {
+        FileUtils.deleteDirectory(new File(outputPath));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public PartFileWriterTest.TestMeta testMeta = new PartFileWriterTest.TestMeta();
+
+  @Test
+  public void testBlockWriting() throws IOException
+  {
+    Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.DAGContext.APPLICATION_ID, "PartitionWriterTest");
+    attributes.put(DAG.DAGContext.APPLICATION_PATH, testMeta.outputPath);
+    testMeta.context = mockOperatorContext(1, attributes);
+
+    testMeta.underTest.setup(testMeta.context);
+    for (int fileIndex = 0; fileIndex < FILE_CONTENTS.length; fileIndex++) {
+      testMeta.underTest.beginWindow(fileIndex);
+      testMeta.underTest.fileMetadataInput.process(testMeta.fileMetadatas.get(fileIndex));
+      for (int blockIndex = 0; blockIndex < testMeta.fileMetadatas.get(fileIndex).getNumberOfBlocks(); blockIndex++) {
+        testMeta.underTest.input.process(testMeta.blockDataList.get(fileIndex).get(blockIndex));
+      }
+      testMeta.underTest.endWindow();
+    }
+    testMeta.underTest.committed(2);
+    testMeta.underTest.teardown();
+    File[] blockFileNames = testMeta.blocksDir.listFiles();
+    for (File blockFile : blockFileNames) {
+      int fileIndex = Integer.parseInt(blockFile.getName().split("\\.")[0].replaceAll(FILE_NAME, ""));
+      //Ignore the PartFileWriter.PARTSUFFIX and trailing "."
+      int blockIndex = Integer.parseInt(blockFile.getName().split("\\.")[1].replaceAll(PartFileWriter.PARTSUFFIX.substring(1), ""));
+      String expected = testMeta.blockIdToExpectedContent.get(testMeta.fileMetadatas.get(fileIndex).getBlockIds()[blockIndex - 1]);
+      Assert.assertEquals(expected, FileUtils.readFileToString(blockFile));
+    }
+  }
+}