You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ra...@apache.org on 2016/05/17 15:51:37 UTC

[1/2] incubator-apex-malhar git commit: APEXMALHAR-2077 AbstractSingleFileOutputOperator appending partitionID

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master f94c2d8d5 -> 1d3e20c69


APEXMALHAR-2077 AbstractSingleFileOutputOperator appending partitionID

1. Renamed HDFSOutputOperator to BytesFileOutputOperator
2. Moved partitionID related logic into AbstractSingleFileOutputOperator
3. BytesFileOutputOperator extends AbstractSingleFileOutputOperator

4. renamed currentPartName to partitionedFileName

5. update partitionedFileName from field setters

6. Fixing unit tests

7. Incorporating review comments.

8. Fixing checkstyle violation


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/67ad24ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/67ad24ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/67ad24ed

Branch: refs/heads/master
Commit: 67ad24edd6dc1629501e86168d2594606c52e291
Parents: 2459f6c
Author: yogidevendra <de...@datatorrent.com>
Authored: Fri May 13 11:33:53 2016 +0530
Committer: yogidevendra <de...@datatorrent.com>
Committed: Tue May 17 18:46:09 2016 +0530

----------------------------------------------------------------------
 .../io/fs/AbstractSingleFileOutputOperator.java |  65 ++-
 .../malhar/lib/fs/BytesFileOutputOperator.java  | 295 ++++++++++++++
 .../apex/malhar/lib/fs/HDFSOutputOperator.java  | 394 -------------------
 .../AbstractSingleFileOutputOperatorTest.java   |   4 +-
 .../lib/fs/BytesFileOutputOperatorTest.java     | 151 +++++++
 .../malhar/lib/fs/HDFSOutputOperatorTest.java   | 151 -------
 6 files changed, 513 insertions(+), 547 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/67ad24ed/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java
index 1844fb2..3b60d4a 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java
@@ -20,6 +20,10 @@ package com.datatorrent.lib.io.fs;
 
 import javax.validation.constraints.NotNull;
 
+import org.apache.commons.lang.StringUtils;
+
+import com.datatorrent.api.Context.OperatorContext;
+
 /**
  * This is a simple class that output all tuples to a single file.
  *
@@ -39,10 +43,41 @@ public abstract class AbstractSingleFileOutputOperator<INPUT> extends AbstractFi
   @NotNull
   protected String outputFileName;
 
+  /**
+   * partitionedFileName string format specifier 
+      e.g. fileName_physicalPartionId -> %s_%d 
+   */
+  private String partitionedFileNameformat = "%s_%d";
+
+  /**
+   * Derived name for file based on physicalPartitionId
+   */
+  private transient String partitionedFileName;
+
+  /**
+   * Physical partition id for the current partition.
+   */
+  private transient int physicalPartitionId;
+
+  /**
+   * Initializing current partition id, partitionedFileName etc. {@inheritDoc}
+   */
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    physicalPartitionId = context.getId();
+    if (StringUtils.isEmpty(partitionedFileNameformat)) {
+      partitionedFileName = outputFileName;
+    } else {
+      partitionedFileName = String.format(partitionedFileNameformat, outputFileName, physicalPartitionId);
+    }
+  }
+
   @Override
   protected String getFileName(INPUT tuple)
   {
-    return outputFileName;
+    return partitionedFileName;
   }
 
   /**
@@ -62,4 +97,32 @@ public abstract class AbstractSingleFileOutputOperator<INPUT> extends AbstractFi
   {
     return outputFileName;
   }
+
+  /**
+   * @return string format specifier for the partitioned file name
+   */
+  public String getPartitionedFileNameformat()
+  {
+    return partitionedFileNameformat;
+  }
+  
+  /**
+   * @param partitionedFileNameformat
+   *          string format specifier for the partitioned file name. It should have one %s and one %d.
+   *          e.g. fileName_physicalPartionId -> %s_%d 
+   */
+  public void setPartitionedFileNameformat(String partitionedFileNameformat)
+  {
+    this.partitionedFileNameformat = partitionedFileNameformat;
+  }
+  
+  /**
+   * @return
+   * Derived name for file based on physicalPartitionId
+   */
+  public String getPartitionedFileName()
+  {
+    return partitionedFileName;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/67ad24ed/library/src/main/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperator.java
new file mode 100644
index 0000000..82f038f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperator.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This class is responsible for writing tuples to HDFS. All tuples are written
+ * to the same file. Rolling file based on size, no. of tuples, idle windows,
+ * elapsed windows is supported.
+ * 
+ * @param <T>
+ */
+
+class BytesFileOutputOperator extends AbstractSingleFileOutputOperator<byte[]>
+{
+
+  /**
+   * Flag to mark if new data in current application window
+   */
+  private transient boolean isNewDataInCurrentWindow;
+
+  /**
+   * Separator between the tuples
+   */
+  private String tupleSeparator;
+
+  /**
+   * byte[] representation of tupleSeparator
+   */
+  private transient byte[] tupleSeparatorBytes;
+
+  /**
+   * No. of bytes received in current application window
+   */
+  @AutoMetric
+  private long byteCount;
+
+  /**
+   * No. of tuples present in current part for file
+   */
+  private long currentPartTupleCount;
+
+  /**
+   * Max. number of tuples allowed per part. Part file will be finalized after
+   * these many tuples
+   */
+  private long maxTupleCount = Long.MAX_VALUE;
+
+  /**
+   * No. of windows since last new data received
+   */
+  private long currentPartIdleWindows;
+
+  /**
+   * Max number of idle windows for which no new data is added to current part
+   * file. Part file will be finalized after these many idle windows after last
+   * new data.
+   */
+  private long maxIdleWindows = Long.MAX_VALUE;
+
+  /**
+   * Stream codec for string input port
+   */
+  protected StreamCodec<String> stringStreamCodec;
+
+  /**
+   * Default value for stream expiry
+   */
+  private static final long DEFAULT_STREAM_EXPIRY_ACCESS_MILL = 60 * 60 * 1000L; //1 hour
+
+  /**
+   * Default value for rotation windows
+   */
+  private static final int DEFAULT_ROTATION_WINDOWS = 2 * 60 * 10; //10 min  
+
+  /**
+   * Initializing default values for tuple separator, stream expiry, rotation
+   * windows
+   */
+  public BytesFileOutputOperator()
+  {
+    setTupleSeparator(System.getProperty("line.separator"));
+    setExpireStreamAfterAccessMillis(DEFAULT_STREAM_EXPIRY_ACCESS_MILL);
+    setRotationWindows(DEFAULT_ROTATION_WINDOWS);
+  }
+
+  /**
+   * Input port for receiving string tuples.
+   */
+  public final transient DefaultInputPort<String> stringInput = new DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String tuple)
+    {
+      processTuple(tuple.getBytes());
+    }
+
+    @Override
+    public StreamCodec<String> getStreamCodec()
+    {
+      if (BytesFileOutputOperator.this.stringStreamCodec == null) {
+        return super.getStreamCodec();
+      } else {
+        return stringStreamCodec;
+      }
+    }
+  };
+
+  /**
+   * {@inheritDoc}
+   * 
+   * @return byte[] representation of the given tuple. if input tuple is of type
+   *         byte[] then it is returned as it is. for any other type toString()
+   *         representation is used to generate byte[].
+   */
+  @Override
+  protected byte[] getBytesForTuple(byte[] tuple)
+  {
+    ByteArrayOutputStream bytesOutStream = new ByteArrayOutputStream();
+
+    try {
+      bytesOutStream.write(tuple);
+      bytesOutStream.write(tupleSeparatorBytes);
+      byteCount += bytesOutStream.size();
+      return bytesOutStream.toByteArray();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      try {
+        bytesOutStream.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Initializing per window level fields {@inheritDoc}
+   */
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    byteCount = 0;
+    isNewDataInCurrentWindow = false;
+  }
+
+  /**
+   * {@inheritDoc} Does additional state maintenance for rollover
+   */
+  @Override
+  protected void processTuple(byte[] tuple)
+  {
+    super.processTuple(tuple);
+    isNewDataInCurrentWindow = true;
+
+    if (++currentPartTupleCount == maxTupleCount) {
+      rotateCall(getPartitionedFileName());
+    }
+  }
+
+  /**
+   * {@inheritDoc} Does additional checks if file should be rolled over for this
+   * window.
+   */
+  @Override
+  public void endWindow()
+  {
+    super.endWindow();
+
+    if (!isNewDataInCurrentWindow) {
+      ++currentPartIdleWindows;
+    } else {
+      currentPartIdleWindows = 0;
+    }
+
+    if (checkEndWindowFinalization()) {
+      rotateCall(getPartitionedFileName());
+    }
+  }
+
+  /**
+   * Rollover check at the endWindow
+   */
+  private boolean checkEndWindowFinalization()
+  {
+    if ((currentPartIdleWindows == maxIdleWindows)) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * {@inheritDoc} Handles file rotation along with exception handling
+   * 
+   * @param lastFile
+   */
+  protected void rotateCall(String lastFile)
+  {
+    try {
+      this.rotate(lastFile);
+      currentPartIdleWindows = 0;
+      currentPartTupleCount = 0;
+    } catch (IOException ex) {
+      LOG.error("Exception in file rotation", ex);
+      DTThrowable.rethrow(ex);
+    } catch (ExecutionException ex) {
+      LOG.error("Exception in file rotation", ex);
+      DTThrowable.rethrow(ex);
+    }
+  }
+
+
+  /**
+   * @return Separator between the tuples
+   */
+  public String getTupleSeparator()
+  {
+    return tupleSeparator;
+  }
+
+  /**
+   * @param separator
+   *          Separator between the tuples
+   */
+  public void setTupleSeparator(String separator)
+  {
+    this.tupleSeparator = separator;
+    this.tupleSeparatorBytes = separator.getBytes();
+  }
+
+  /**
+   * @return max tuples in a part file
+   */
+  public long getMaxTupleCount()
+  {
+    return maxTupleCount;
+  }
+
+  /**
+   * @param maxTupleCount
+   *          max tuples in a part file
+   */
+  public void setMaxTupleCount(long maxTupleCount)
+  {
+    this.maxTupleCount = maxTupleCount;
+  }
+
+  /**
+   * @return max number of idle windows for rollover
+   */
+  public long getMaxIdleWindows()
+  {
+    return maxIdleWindows;
+  }
+
+  /**
+   * @param maxIdleWindows max number of idle windows for rollover
+   */
+  public void setMaxIdleWindows(long maxIdleWindows)
+  {
+    this.maxIdleWindows = maxIdleWindows;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(BytesFileOutputOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/67ad24ed/library/src/main/java/org/apache/apex/malhar/lib/fs/HDFSOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/HDFSOutputOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/HDFSOutputOperator.java
deleted file mode 100644
index 344e14e..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/fs/HDFSOutputOperator.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.apex.malhar.lib.fs;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-
-import javax.validation.constraints.NotNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.AutoMetric;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
-import com.datatorrent.netlet.util.DTThrowable;
-
-/**
- * This class is responsible for writing tuples to HDFS. All tuples are written
- * to the same file. Rolling file based on size, no. of tuples, idle windows,
- * elapsed windows is supported.
- * 
- * @param <T>
- */
-
-class HDFSOutputOperator extends AbstractFileOutputOperator<byte[]>
-{
-
-  /**
-   * Name of the file to write the output. Directory for the output file should
-   * be specified in the filePath 
-   */
-  @NotNull
-  private String fileName;
-
-  /**
-   * currentPartName can be of the form fileName_physicalPartionId.partNumber
-   */
-  private String currentPartNameformat = "%s_%d";
-
-  /**
-   * currentPartName is of the form fileName_physicalPartionId.partNumber
-   */
-  private transient String currentPartName;
-
-  /**
-   * Physical partition id for the current partition.
-   */
-  private transient int physicalPartitionId;
-
-  /**
-   * Flag to mark if new data in current application window
-   */
-  private transient boolean isNewDataInCurrentWindow;
-
-  /**
-   * Separator between the tuples
-   */
-  private String tupleSeparator;
-
-  /**
-   * byte[] representation of tupleSeparator
-   */
-  private transient byte[] tupleSeparatorBytes;
-
-  /**
-   * No. of bytes received in current application window
-   */
-  @AutoMetric
-  private long byteCount;
-
-  /**
-   * No. of tuples present in current part for file
-   */
-  private long currentPartTupleCount;
-
-  /**
-   * Max. number of tuples allowed per part. Part file will be finalized after
-   * these many tuples
-   */
-  private long maxTupleCount = Long.MAX_VALUE;
-
-  /**
-   * No. of windows since last new data received
-   */
-  private long currentPartIdleWindows;
-
-  /**
-   * Max number of idle windows for which no new data is added to current part
-   * file. Part file will be finalized after these many idle windows after last
-   * new data.
-   */
-  private long maxIdleWindows = Long.MAX_VALUE;
-
-  /**
-   * Stream codec for string input port
-   */
-  protected StreamCodec<String> stringStreamCodec;
-
-  /**
-   * Default value for stream expiry
-   */
-  private static final long DEFAULT_STREAM_EXPIRY_ACCESS_MILL = 60 * 60 * 1000L; //1 hour
-
-  /**
-   * Default value for rotation windows
-   */
-  private static final int DEFAULT_ROTATION_WINDOWS = 2 * 60 * 10; //10 min  
-
-  /**
-   * Initializing default values for tuple separator, stream expiry, rotation
-   * windows
-   */
-  public HDFSOutputOperator()
-  {
-    setTupleSeparator(System.getProperty("line.separator"));
-    setExpireStreamAfterAccessMillis(DEFAULT_STREAM_EXPIRY_ACCESS_MILL);
-    setRotationWindows(DEFAULT_ROTATION_WINDOWS);
-  }
-
-  /**
-   * Initializing current partition id, part name etc. {@inheritDoc}
-   */
-  @Override
-  public void setup(OperatorContext context)
-  {
-    super.setup(context);
-    physicalPartitionId = context.getId();
-    currentPartName = String.format(currentPartNameformat, fileName, physicalPartitionId);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  protected String getFileName(byte[] tuple)
-  {
-    return currentPartName;
-  }
-
-  /**
-   * Input port for receiving string tuples.
-   */
-  public final transient DefaultInputPort<String> stringInput = new DefaultInputPort<String>()
-  {
-    @Override
-    public void process(String tuple)
-    {
-      processTuple(tuple.getBytes());
-    }
-
-    @Override
-    public StreamCodec<String> getStreamCodec()
-    {
-      if (HDFSOutputOperator.this.stringStreamCodec == null) {
-        return super.getStreamCodec();
-      } else {
-        return stringStreamCodec;
-      }
-    }
-  };
-
-  /**
-   * {@inheritDoc}
-   * 
-   * @return byte[] representation of the given tuple. if input tuple is of type
-   *         byte[] then it is returned as it is. for any other type toString()
-   *         representation is used to generate byte[].
-   */
-  @Override
-  protected byte[] getBytesForTuple(byte[] tuple)
-  {
-    ByteArrayOutputStream bytesOutStream = new ByteArrayOutputStream();
-
-    try {
-      bytesOutStream.write(tuple);
-      bytesOutStream.write(tupleSeparatorBytes);
-      byteCount += bytesOutStream.size();
-      return bytesOutStream.toByteArray();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } finally {
-      try {
-        bytesOutStream.close();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  /**
-   * Initializing per window level fields {@inheritDoc}
-   */
-  @Override
-  public void beginWindow(long windowId)
-  {
-    super.beginWindow(windowId);
-    byteCount = 0;
-    isNewDataInCurrentWindow = false;
-  }
-
-  /**
-   * {@inheritDoc} Does additional state maintenance for rollover
-   */
-  @Override
-  protected void processTuple(byte[] tuple)
-  {
-    super.processTuple(tuple);
-    isNewDataInCurrentWindow = true;
-
-    if (++currentPartTupleCount == maxTupleCount) {
-      rotateCall(currentPartName);
-    }
-  }
-
-  /**
-   * {@inheritDoc} Does additional checks if file should be rolled over for this
-   * window.
-   */
-  @Override
-  public void endWindow()
-  {
-    super.endWindow();
-
-    if (!isNewDataInCurrentWindow) {
-      ++currentPartIdleWindows;
-    } else {
-      currentPartIdleWindows = 0;
-    }
-
-    if (checkEndWindowFinalization()) {
-      rotateCall(currentPartName);
-    }
-  }
-
-  /**
-   * Rollover check at the endWindow
-   */
-  private boolean checkEndWindowFinalization()
-  {
-    if ((currentPartIdleWindows == maxIdleWindows)) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * {@inheritDoc} Handles file rotation along with exception handling
-   * 
-   * @param lastFile
-   */
-  protected void rotateCall(String lastFile)
-  {
-    try {
-      this.rotate(lastFile);
-      currentPartName = String.format(currentPartNameformat, fileName, physicalPartitionId);
-      currentPartIdleWindows = 0;
-      currentPartTupleCount = 0;
-    } catch (IOException ex) {
-      LOG.error("Exception in file rotation", ex);
-      DTThrowable.rethrow(ex);
-    } catch (ExecutionException ex) {
-      LOG.error("Exception in file rotation", ex);
-      DTThrowable.rethrow(ex);
-    }
-  }
-
-  /**
-   * @return File name for writing output. All tuples are written to the same
-   *         file.
-   */
-  public String getFileName()
-  {
-    return fileName;
-  }
-
-  /**
-   * @param fileName
-   *          File name for writing output. All tuples are written to the same
-   *          file.
-   */
-  public void setFileName(String fileName)
-  {
-    this.fileName = fileName;
-  }
-
-  /**
-   * @return string format specifier for current part name
-   */
-  public String getCurrentPartNameformat()
-  {
-    return currentPartNameformat;
-  }
-
-  /**
-   * @param currentPartNameformat
-   *          string format specifier for current part name
-   */
-  public void setCurrentPartNameformat(String currentPartNameformat)
-  {
-    this.currentPartNameformat = currentPartNameformat;
-  }
-
-  /**
-   * @return name of the current part file
-   */
-  public String getCurrentPartName()
-  {
-    return currentPartName;
-  }
-
-  /**
-   * @param currentPartName
-   *          name of the current part file
-   */
-  public void setCurrentPartName(String currentPartName)
-  {
-    this.currentPartName = currentPartName;
-  }
-
-  /**
-   * @return Separator between the tuples
-   */
-  public String getTupleSeparator()
-  {
-    return tupleSeparator;
-  }
-
-  /**
-   * @param separator
-   *          Separator between the tuples
-   */
-  public void setTupleSeparator(String separator)
-  {
-    this.tupleSeparator = separator;
-    this.tupleSeparatorBytes = separator.getBytes();
-  }
-
-  /**
-   * @return max tuples in a part file
-   */
-  public long getMaxTupleCount()
-  {
-    return maxTupleCount;
-  }
-
-  /**
-   * @param maxTupleCount
-   *          max tuples in a part file
-   */
-  public void setMaxTupleCount(long maxTupleCount)
-  {
-    this.maxTupleCount = maxTupleCount;
-  }
-
-  /**
-   * @return max number of idle windows for rollover
-   */
-  public long getMaxIdleWindows()
-  {
-    return maxIdleWindows;
-  }
-
-  /**
-   * @param maxIdleWindows
-   *          max number of idle windows for rollover
-   */
-  public void setMaxIdleWindows(long maxIdleWindows)
-  {
-    this.maxIdleWindows = maxIdleWindows;
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(HDFSOutputOperator.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/67ad24ed/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java
index 2df4ae0..07c74db 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java
@@ -144,6 +144,7 @@ public class AbstractSingleFileOutputOperatorTest
   public void testSingleFileCompletedWrite()
   {
     writer.setOutputFileName(SINGLE_FILE);
+    writer.setPartitionedFileNameformat(null);
 
     writer.setFilePath(testMeta.getDir());
 
@@ -177,7 +178,8 @@ public class AbstractSingleFileOutputOperatorTest
   public void testSingleFileFailedWrite()
   {
     writer.setOutputFileName(SINGLE_FILE);
-
+    writer.setPartitionedFileNameformat("");
+    
     File meta = new File(testMeta.getDir());
     writer.setFilePath(meta.getAbsolutePath());
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/67ad24ed/library/src/test/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperatorTest.java
new file mode 100644
index 0000000..1ea7352
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/BytesFileOutputOperatorTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest;
+import com.datatorrent.netlet.util.DTThrowable;
+
+public class BytesFileOutputOperatorTest extends AbstractFileOutputOperatorTest
+{
+
+  /**
+   * Test file rollover in case of idle windows
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testIdleWindowsFinalize() throws IOException
+  {
+    BytesFileOutputOperator writer = new BytesFileOutputOperator();
+    writer.setOutputFileName("output.txt");
+    writer.setFilePath(testMeta.getDir());
+    writer.setAlwaysWriteToTmp(true);
+    writer.setMaxIdleWindows(5);
+    writer.setup(testMeta.testOperatorContext);
+
+    String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {}, {}, {}, {"6a", "6b" }, {"7a", "7b" }, {}, {}, {},
+        {}, {}, {"13a", "13b" }, {"14a", "14b" }, {}, {}, {}, {"18a", "18b" }, {"19a", "19b" }, {}, {}, {}, {}, {},
+        {}, {"26a", "26b"} };
+
+    for (int i = 0; i <= 12; i++) {
+      writer.beginWindow(i);
+      for (String t : tuples[i]) {
+        writer.stringInput.put(t);
+      }
+      writer.endWindow();
+    }
+    writer.committed(10);
+
+    for (int i = 13; i <= 26; i++) {
+      writer.beginWindow(i);
+      for (String t : tuples[i]) {
+        writer.stringInput.put(t);
+      }
+      writer.endWindow();
+    }
+    writer.committed(20);
+    writer.committed(26);
+
+    String[] expected = {"0a\n0b\n1a\n1b\n6a\n6b\n7a\n7b\n", "13a\n13b\n14a\n14b\n18a\n18b\n19a\n19b\n",
+        "26a\n26b\n" };
+
+    for (int i = 0; i < expected.length; i++) {
+      checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true);
+    }
+  }
+
+  /**
+   * Test file rollover for tuple count
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testTupleCountFinalize() throws IOException
+  {
+    BytesFileOutputOperator writer = new BytesFileOutputOperator();
+    writer.setOutputFileName("output.txt");
+    writer.setFilePath(testMeta.getDir());
+    writer.setAlwaysWriteToTmp(true);
+    writer.setMaxTupleCount(10);
+    writer.setup(testMeta.testOperatorContext);
+
+    String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {"3a", "3b" }, {"4a", "4b" }, {}, {"6a", "6b" },
+        {"7a", "7b" }, {}, {}, {"9a" }, {"10a", "10b" }, {}, {"12a" }, {"13a", "13b"}, {"14a", "14b" }, {}, {},
+        {}, {"18a", "18b" }, {"19a", "19b" }, {"20a" }, {"21a" }, {"22a" }};
+
+    for (int i = 0; i < tuples.length; i++) {
+      writer.beginWindow(i);
+      for (String t : tuples[i]) {
+        writer.stringInput.put(t);
+      }
+      writer.endWindow();
+      if (i % 10 == 0) {
+        writer.committed(10);
+      }
+    }
+    writer.committed(tuples.length);
+
+    String[] expected = {"0a\n0b\n1a\n1b\n3a\n3b\n4a\n4b\n6a\n6b\n", "7a\n7b\n9a\n10a\n10b\n12a\n13a\n13b\n14a\n14b\n",
+        "18a\n18b\n19a\n19b\n20a\n21a\n22a\n" };
+
+    for (int i = 0; i < expected.length; i++) {
+      checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true);
+    }
+  }
+
+  public static void checkOutput(int fileCount, String baseFilePath, String expectedOutput, boolean checkTmp)
+  {
+    if (fileCount >= 0) {
+      baseFilePath += "." + fileCount;
+    }
+
+    File file = new File(baseFilePath);
+
+    if (!file.exists()) {
+      String[] extensions = {"tmp"};
+      Collection<File> tmpFiles = FileUtils.listFiles(file.getParentFile(), extensions, false);
+      for (File tmpFile : tmpFiles) {
+        if (file.getPath().startsWith(baseFilePath)) {
+          file = tmpFile;
+          break;
+        }
+      }
+    }
+
+    String fileContents = null;
+
+    try {
+      fileContents = FileUtils.readFileToString(file);
+    } catch (IOException ex) {
+      DTThrowable.rethrow(ex);
+    }
+
+    Assert.assertEquals("Single file " + fileCount + " output contents", expectedOutput, fileContents);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/67ad24ed/library/src/test/java/org/apache/apex/malhar/lib/fs/HDFSOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/HDFSOutputOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/HDFSOutputOperatorTest.java
deleted file mode 100644
index f797676..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/fs/HDFSOutputOperatorTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.apex.malhar.lib.fs;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.commons.io.FileUtils;
-
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest;
-import com.datatorrent.netlet.util.DTThrowable;
-
-public class HDFSOutputOperatorTest extends AbstractFileOutputOperatorTest
-{
-
-  /**
-   * Test file rollover in case of idle windows
-   * 
-   * @throws IOException
-   */
-  @Test
-  public void testIdleWindowsFinalize() throws IOException
-  {
-    HDFSOutputOperator writer = new HDFSOutputOperator();
-    writer.setFileName("output.txt");
-    writer.setFilePath(testMeta.getDir());
-    writer.setAlwaysWriteToTmp(true);
-    writer.setMaxIdleWindows(5);
-    writer.setup(testMeta.testOperatorContext);
-
-    String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {}, {}, {}, {"6a", "6b" }, {"7a", "7b" }, {}, {}, {},
-        {}, {}, {"13a", "13b" }, {"14a", "14b" }, {}, {}, {}, {"18a", "18b" }, {"19a", "19b" }, {}, {}, {}, {}, {},
-        {}, {"26a", "26b"} };
-
-    for (int i = 0; i <= 12; i++) {
-      writer.beginWindow(i);
-      for (String t : tuples[i]) {
-        writer.stringInput.put(t);
-      }
-      writer.endWindow();
-    }
-    writer.committed(10);
-
-    for (int i = 13; i <= 26; i++) {
-      writer.beginWindow(i);
-      for (String t : tuples[i]) {
-        writer.stringInput.put(t);
-      }
-      writer.endWindow();
-    }
-    writer.committed(20);
-    writer.committed(26);
-
-    String[] expected = {"0a\n0b\n1a\n1b\n6a\n6b\n7a\n7b\n", "13a\n13b\n14a\n14b\n18a\n18b\n19a\n19b\n",
-        "26a\n26b\n" };
-
-    for (int i = 0; i < expected.length; i++) {
-      checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true);
-    }
-  }
-
-  /**
-   * Test file rollover for tuple count
-   * 
-   * @throws IOException
-   */
-  @Test
-  public void testTupleCountFinalize() throws IOException
-  {
-    HDFSOutputOperator writer = new HDFSOutputOperator();
-    writer.setFileName("output.txt");
-    writer.setFilePath(testMeta.getDir());
-    writer.setAlwaysWriteToTmp(true);
-    writer.setMaxTupleCount(10);
-    writer.setup(testMeta.testOperatorContext);
-
-    String[][] tuples = {{"0a", "0b" }, {"1a", "1b" }, {}, {"3a", "3b" }, {"4a", "4b" }, {}, {"6a", "6b" },
-        {"7a", "7b" }, {}, {}, {"9a" }, {"10a", "10b" }, {}, {"12a" }, {"13a", "13b"}, {"14a", "14b" }, {}, {},
-        {}, {"18a", "18b" }, {"19a", "19b" }, {"20a" }, {"21a" }, {"22a" }};
-
-    for (int i = 0; i < tuples.length; i++) {
-      writer.beginWindow(i);
-      for (String t : tuples[i]) {
-        writer.stringInput.put(t);
-      }
-      writer.endWindow();
-      if (i % 10 == 0) {
-        writer.committed(10);
-      }
-    }
-    writer.committed(tuples.length);
-
-    String[] expected = {"0a\n0b\n1a\n1b\n3a\n3b\n4a\n4b\n6a\n6b\n", "7a\n7b\n9a\n10a\n10b\n12a\n13a\n13b\n14a\n14b\n",
-        "18a\n18b\n19a\n19b\n20a\n21a\n22a\n" };
-
-    for (int i = 0; i < expected.length; i++) {
-      checkOutput(i, testMeta.getDir() + "/output.txt_0", expected[i], true);
-    }
-  }
-
-  public static void checkOutput(int fileCount, String baseFilePath, String expectedOutput, boolean checkTmp)
-  {
-    if (fileCount >= 0) {
-      baseFilePath += "." + fileCount;
-    }
-
-    File file = new File(baseFilePath);
-
-    if (!file.exists()) {
-      String[] extensions = {"tmp"};
-      Collection<File> tmpFiles = FileUtils.listFiles(file.getParentFile(), extensions, false);
-      for (File tmpFile : tmpFiles) {
-        if (file.getPath().startsWith(baseFilePath)) {
-          file = tmpFile;
-          break;
-        }
-      }
-    }
-
-    String fileContents = null;
-
-    try {
-      fileContents = FileUtils.readFileToString(file);
-    } catch (IOException ex) {
-      DTThrowable.rethrow(ex);
-    }
-
-    Assert.assertEquals("Single file " + fileCount + " output contents", expectedOutput, fileContents);
-  }
-}


[2/2] incubator-apex-malhar git commit: Merge remote-tracking branch 'yogi/APEXMALHAR-2077-AbstractSingleFileOutputOperator-partitionId'

Posted by ra...@apache.org.
Merge remote-tracking branch 'yogi/APEXMALHAR-2077-AbstractSingleFileOutputOperator-partitionId'


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1d3e20c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1d3e20c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1d3e20c6

Branch: refs/heads/master
Commit: 1d3e20c69428a4c4101c2b6da22b631d81a1961d
Parents: f94c2d8 67ad24e
Author: Munagala V. Ramanath <ra...@datatorrent.com>
Authored: Tue May 17 08:47:26 2016 -0700
Committer: Munagala V. Ramanath <ra...@datatorrent.com>
Committed: Tue May 17 08:47:26 2016 -0700

----------------------------------------------------------------------
 .../io/fs/AbstractSingleFileOutputOperator.java |  65 ++-
 .../malhar/lib/fs/BytesFileOutputOperator.java  | 295 ++++++++++++++
 .../apex/malhar/lib/fs/HDFSOutputOperator.java  | 394 -------------------
 .../AbstractSingleFileOutputOperatorTest.java   |   4 +-
 .../lib/fs/BytesFileOutputOperatorTest.java     | 151 +++++++
 .../malhar/lib/fs/HDFSOutputOperatorTest.java   | 151 -------
 6 files changed, 513 insertions(+), 547 deletions(-)
----------------------------------------------------------------------