You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by chandnisingh <gi...@git.apache.org> on 2016/04/15 04:26:37 UTC

[GitHub] incubator-apex-malhar pull request: [ONLY FOR REVIEW] APEXMALHAR 1...

GitHub user chandnisingh opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242

    [ONLY FOR REVIEW] APEXMALHAR 1965

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/chandnisingh/incubator-apex-malhar APEXMALHAR-1965

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/242.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #242
    
----
commit 36bd1cd5aebfdbb70336ef1c68d0d7f74ea41288
Author: Tushar R. Gosavi <tu...@apache.org>
Date:   2016-02-27T17:03:17Z

    APEXMALHAR-1965 Utility classes for Write Ahead Log support

commit 54a407c9b0843630f0bea2b56e0cba11708a863c
Author: Chandni Singh <cs...@apache.org>
Date:   2016-04-12T21:59:35Z

    Wal changes

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60151911
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/IOUtils.java ---
    @@ -0,0 +1,56 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.utils;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +
    +public class IOUtils
    +{
    +  /**
    +   * Utility method to copy partial data from input stream to output stream.
    +   * @param inputStream        input stream
    +   * @param inputStreamOffset  offset in the input stream till which the data is copied.
    +   * @param outputStream       output stream
    +   * @throws IOException
    +   */
    +  public static void copyPartial(InputStream inputStream, long inputStreamOffset, OutputStream outputStream)
    --- End diff --
    
    IOUtils.copyLarge will copy all the data from input to output stream.
    Needed to copy data till a particular offset from the input to output. This is common in AbstractFileOutput operator which can use this util function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [ONLY FOR REVIEW] APEXMALHAR 1...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r59823701
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java ---
    @@ -0,0 +1,100 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.IOException;
    +
    +/**
    + * This interface represents a write ahead log that can be used by operator.
    + * the WAL is split into two interfaces, a WALWriter which allows writing
    + * data, and WALReader which provides iterator like interface to read entries
    + * written to the WAL.
    + *
    + * @param <READER> Type of WAL Reader
    + * @param <WRITER> WAL Pointer Type.
    + */
    +public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
    +{
    +  void setup();
    +
    +  void teardown();
    +
    +  void beforeCheckpoint(long window);
    +
    +  void committed(long window);
    +
    +  READER getReader() throws IOException;
    +
    +  WRITER getWriter() throws IOException;
    +
    +  /**
    +   * Provides iterator like interface to read entries from the WAL.
    +   * @param <T> type of WAL entries
    +   * @param <P> type of Pointer in the WAL
    +   */
    +  interface WALReader<T, P>
    +  {
    +    /**
    +     * Seek to middle of the WAL. This is used primarily during recovery,
    +     * when we need to start recovering data from middle of WAL file.
    +     */
    +    void seek(P pointer) throws IOException;
    +
    +    /**
    +     * Advance WAL by one entry, returns true if it can advance, else false
    +     * in case of any other error throws an Exception.
    +     *
    +     * @return true if next data item is read successfully, false if data can not be read.
    +     * @throws IOException
    +     */
    +    boolean advance() throws IOException;
    +
    +    /**
    +     * Return current entry from WAL, returns null if end of file has reached.
    +     *
    +     * @return MutableKeyValue
    +     */
    +    T get();
    +  }
    +
    +  /**
    +   * Provide method to write entries to the WAL.
    +   * @param <T>
    +   * @param <P>
    +   */
    +  interface WALWriter<T, P>
    --- End diff --
    
    Is P needed for writer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60343528
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    --- End diff --
    
    Currently purge support in WAL is missing. I think I will add a ```delete(P pointer)``` to the WALWriter api. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [ONLY FOR REVIEW] APEXMALHAR 1...

Posted by ishark <gi...@git.apache.org>.
Github user ishark commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r59823744
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java ---
    @@ -0,0 +1,100 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.IOException;
    +
    +/**
    + * This interface represents a write ahead log that can be used by operator.
    + * the WAL is split into two interfaces, a WALWriter which allows writing
    + * data, and WALReader which provides iterator like interface to read entries
    + * written to the WAL.
    + *
    + * @param <READER> Type of WAL Reader
    + * @param <WRITER> WAL Pointer Type.
    + */
    +public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
    +{
    +  void setup();
    +
    +  void teardown();
    +
    +  void beforeCheckpoint(long window);
    +
    +  void committed(long window);
    +
    +  READER getReader() throws IOException;
    +
    +  WRITER getWriter() throws IOException;
    +
    +  /**
    +   * Provides iterator like interface to read entries from the WAL.
    +   * @param <T> type of WAL entries
    +   * @param <P> type of Pointer in the WAL
    +   */
    +  interface WALReader<T, P>
    +  {
    +    /**
    +     * Seek to middle of the WAL. This is used primarily during recovery,
    +     * when we need to start recovering data from middle of WAL file.
    +     */
    +    void seek(P pointer) throws IOException;
    +
    +    /**
    +     * Advance WAL by one entry, returns true if it can advance, else false
    +     * in case of any other error throws an Exception.
    +     *
    +     * @return true if next data item is read successfully, false if data can not be read.
    +     * @throws IOException
    +     */
    +    boolean advance() throws IOException;
    +
    +    /**
    +     * Return current entry from WAL, returns null if end of file has reached.
    +     *
    +     * @return MutableKeyValue
    +     */
    +    T get();
    +  }
    +
    +  /**
    +   * Provide method to write entries to the WAL.
    +   * @param <T>
    +   * @param <P>
    +   */
    +  interface WALWriter<T, P>
    +  {
    +    /**
    +     * Write an entry to the WAL
    +     */
    +    int append(T entry) throws IOException;
    --- End diff --
    
    Would it be useful to have an API for bulk append of bytes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60683826
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,688 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +import org.apache.hadoop.fs.Syncable;
    +import org.apache.hadoop.fs.local.LocalFs;
    +import org.apache.hadoop.fs.local.RawLocalFs;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * A WAL implementation that is file based.
    + * <p/>
    + * Note:<br/>
    + * The FileSystem Writer and Reader operations should not alternate because intermingling these operations will cause
    + * problems. Typically the  WAL Reader will only used in recovery.<br/>
    + *
    + * Also this implementation is thread unsafe- the filesystem wal writer and reader operations should be performed in
    + * operator's thread.
    + */
    +public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  private FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
    +
    +  //part => tmp file path;
    +  private final Map<Integer, String> tempPartFiles = new TreeMap<>();
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALWriter.open(fileContext);
    +      fileSystemWALReader.open(fileContext);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    --- End diff --
    
    Ok I will.
    This is HDFS issue with the rename+overwrite option that I was referring to
    https://issues.apache.org/jira/browse/HDFS-6757
    It was advised to avoid  rename + overwrite option on under construction file.
    
    Should I document all this as well here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [ONLY FOR REVIEW] APEXMALHAR 1...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r59827549
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java ---
    @@ -0,0 +1,100 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.IOException;
    +
    +/**
    + * This interface represents a write ahead log that can be used by operator.
    + * the WAL is split into two interfaces, a WALWriter which allows writing
    + * data, and WALReader which provides iterator like interface to read entries
    + * written to the WAL.
    + *
    + * @param <READER> Type of WAL Reader
    + * @param <WRITER> WAL Pointer Type.
    + */
    +public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
    +{
    +  void setup();
    +
    +  void teardown();
    +
    +  void beforeCheckpoint(long window);
    +
    +  void committed(long window);
    +
    +  READER getReader() throws IOException;
    +
    +  WRITER getWriter() throws IOException;
    +
    +  /**
    +   * Provides iterator like interface to read entries from the WAL.
    +   * @param <T> type of WAL entries
    +   * @param <P> type of Pointer in the WAL
    +   */
    +  interface WALReader<T, P>
    +  {
    +    /**
    +     * Seek to middle of the WAL. This is used primarily during recovery,
    +     * when we need to start recovering data from middle of WAL file.
    +     */
    +    void seek(P pointer) throws IOException;
    +
    +    /**
    +     * Advance WAL by one entry, returns true if it can advance, else false
    +     * in case of any other error throws an Exception.
    +     *
    +     * @return true if next data item is read successfully, false if data can not be read.
    +     * @throws IOException
    +     */
    +    boolean advance() throws IOException;
    +
    +    /**
    +     * Return current entry from WAL, returns null if end of file has reached.
    +     *
    +     * @return MutableKeyValue
    +     */
    +    T get();
    +  }
    +
    +  /**
    +   * Provide method to write entries to the WAL.
    +   * @param <T>
    +   * @param <P>
    +   */
    +  interface WALWriter<T, P>
    +  {
    +    /**
    +     * Write an entry to the WAL
    +     */
    +    int append(T entry) throws IOException;
    +
    +  }
    +
    +  /**
    +   * Serializer interface used while reading and writing entries to the WAL.
    +   * @param <T>
    +   */
    +  interface Serde<T>
    --- End diff --
    
    Can you please move it to utils so that it can be reused


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60275720
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALReader.open(fileContext);
    +      fileSystemWALWriter.open(fileContext);
    +
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    +  {
    +    try {
    +      fileSystemWALWriter.finalizeFiles(window);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during committed", e);
    +    }
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      fileSystemWALReader.close();
    +      fileSystemWALWriter.close();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during teardown", e);
    +    }
    +  }
    +
    +  protected long getLastCheckpointedWindow()
    +  {
    +    return lastCheckpointedWindow;
    +  }
    +
    +  protected String getPartFilePath(int partNumber)
    +  {
    +    return filePath + "_" + partNumber;
    +  }
    +
    +  @Override
    +  public FileSystemWALReader<T> getReader()
    +  {
    +    return fileSystemWALReader;
    +  }
    +
    +  /**
    +   * Sets the  File System WAL Reader. This can be used to override the default wal reader.
    +   *
    +   * @param fileSystemWALReader wal reader.
    +   */
    +  public void setFileSystemWALReader(@NotNull FileSystemWALReader<T> fileSystemWALReader)
    +  {
    +    this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
    +  }
    +
    +  @Override
    +  public FileSystemWALWriter<T> getWriter()
    +  {
    +    return fileSystemWALWriter;
    +  }
    +
    +  /**
    +   * Sets the File System WAL Writer. This can be used to override the default wal writer.
    +   * @param fileSystemWALWriter wal writer.
    +   */
    +  public void setFileSystemWALWriter(@NotNull FileSystemWALWriter<T> fileSystemWALWriter)
    +  {
    +    this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
    +  }
    +
    +  /**
    +   * @return WAL Entry serde
    +   */
    +  public Serde<T, byte[]> getSerde()
    +  {
    +    return serde;
    +  }
    +
    +  /**
    +   * Sets the serde which is used for wal entry serialization and de-serialization
    +   *
    +   * @param serde serializer/deserializer
    +   */
    +  public void setSerde(@NotNull Serde<T, byte[]> serde)
    +  {
    +    this.serde = Preconditions.checkNotNull(serde, "serde");
    +  }
    +
    +  /**
    +   * @return WAL file path
    +   */
    +  public String getFilePath()
    +  {
    +    return filePath;
    +  }
    +
    +  /**
    +   * Sets the WAL file path.
    +   *
    +   * @param filePath WAL file path
    +   */
    +  public void setFilePath(@NotNull String filePath)
    +  {
    +    this.filePath = Preconditions.checkNotNull(filePath, "filePath");
    +  }
    +
    +  /**
    +   * @return max length of a WAL part file.
    +   */
    +  public long getMaxLength()
    +  {
    +    return maxLength;
    +  }
    +
    +  /**
    +   * Sets the maximum length of a WAL part file.
    +   *
    +   * @param maxLength max length of the WAL part file
    +   */
    +  public void setMaxLength(long maxLength)
    +  {
    +    this.maxLength = maxLength;
    +  }
    +
    +  public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
    +  {
    +    private final int partNum;
    +    private long offset;
    +
    +    private FileSystemWALPointer()
    +    {
    +      //for kryo
    +      partNum = -1;
    +    }
    +
    +    public FileSystemWALPointer(long offset)
    +    {
    +      this(0, offset);
    +    }
    +
    +    public FileSystemWALPointer(int partNum, long offset)
    +    {
    +      this.partNum = partNum;
    +      this.offset = offset;
    +    }
    +
    +    @Override
    +    public int compareTo(@NotNull FileSystemWALPointer o)
    +    {
    +      if (this.partNum < o.partNum) {
    +        return -1;
    +      }
    +      if (this.partNum > o.partNum) {
    +        return 1;
    +      }
    +      if (this.offset < o.offset) {
    +        return -1;
    +      }
    +      if (this.offset > o.offset) {
    +        return 1;
    +      }
    +      return 0;
    +    }
    +
    +    public int getPartNum()
    +    {
    +      return partNum;
    +    }
    +
    +    public long getOffset()
    +    {
    +      return offset;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset + '}';
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem Wal Reader
    +   * @param <T> type of tuple.
    +   */
    +  public static class FileSystemWALReader<T> implements WAL.WALReader<T, FileSystemWALPointer>
    +  {
    +    private T entry;
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +
    +    private transient DataInputStream inputStream;
    +    private transient Path currentOpenPath;
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALReader()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALReader(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
    +      //initialize the input stream
    +      inputStream = getInputStream(currentPointer);
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +    }
    +
    +    @Override
    +    public void seek(FileSystemWALPointer pointer) throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +      }
    +      inputStream = getInputStream(pointer);
    +      Preconditions.checkNotNull(inputStream, "invalid pointer " + pointer);
    +      currentPointer = pointer;
    +    }
    +
    +    /**
    +     * Move to the next WAL segment.
    +     *
    +     * @return true if the next part file exists and is opened; false otherwise.
    +     * @throws IOException
    +     */
    +    private boolean nextSegment() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +
    +      currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
    +      inputStream = getInputStream(currentPointer);
    +
    +      return inputStream != null;
    +    }
    +
    +    private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException
    +    {
    +      Path walPartPath = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
    +      if (fileContext.util().exists(walPartPath)) {
    +        DataInputStream stream = fileContext.open(walPartPath);
    +        if (walPointer.offset > 0) {
    +          stream.skip(walPointer.offset);
    +        }
    +        currentOpenPath = walPartPath;
    +        return stream;
    +      }
    +      return null;
    +    }
    +
    +    @Override
    +    public boolean advance() throws IOException
    +    {
    +      do {
    +        if (inputStream == null) {
    +          inputStream = getInputStream(currentPointer);
    +        }
    +
    +        if (inputStream != null &&
    +            currentPointer.offset < fileContext.getFileStatus(currentOpenPath).getLen()) {
    +          int len = inputStream.readInt();
    +          Preconditions.checkState(len >= 0, "negative length");
    +
    +          byte[] data = new byte[len];
    +          inputStream.readFully(data);
    +
    +          entry = fileSystemWAL.serde.deserialize(data);
    +          currentPointer.offset += data.length + 4;
    +          return true;
    +        }
    +      } while (nextSegment());
    +
    +      entry = null;
    +      return false;
    +    }
    +
    +    @Override
    +    public T get()
    +    {
    +      return entry;
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem WAL Writer.
    +   * @param <T> type of tuple
    +   */
    +  public static class FileSystemWALWriter<T> implements WAL.WALWriter<T>
    +  {
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +    private transient DataOutputStream outputStream;
    +
    +    //windowId => Latest part which can be finalized.
    +    private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
    +
    +    //part => tmp file path;
    +    private final Map<Integer, String> tmpFiles = new TreeMap<>();
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALWriter()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALWriter(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "file context");
    +      recover();
    +      if (outputStream == null) {
    +        outputStream = getOutputStream(currentPointer);
    +      }
    +    }
    +
    +    private void recover() throws IOException
    +    {
    +      LOG.debug("current point", currentPointer);
    +      String tmpFilePath = tmpFiles.get(currentPointer.getPartNum());
    +      if (tmpFilePath != null) {
    +
    +        Path tmpPath = new Path(tmpFilePath);
    +        if (fileContext.util().exists(tmpPath)) {
    +          LOG.debug("tmp path exists {}", tmpPath);
    +
    +          outputStream = getOutputStream(currentPointer);
    +          DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
    +
    +          IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset, outputStream);
    +
    +          outputStream.flush();
    +          //remove old tmp
    +          inputStreamOldTmp.close();
    +          LOG.debug("delete tmp {}", tmpPath);
    +          fileContext.delete(tmpPath, true);
    +        }
    +      }
    +
    +      //find all valid path names
    +      Set<String> validPathNames = new HashSet<>();
    +      for (Map.Entry<Integer, String> entry : tmpFiles.entrySet()) {
    +        if (entry.getKey() <= currentPointer.partNum) {
    +          validPathNames.add(new Path(entry.getValue()).getName());
    +        }
    +      }
    +      LOG.debug("valid names {}", validPathNames);
    +
    +      //there can be a failure just between the flush and the actual checkpoint which can leave some stray tmp files
    +      //which aren't accounted by tmp files map
    +      Path walPath = new Path(fileSystemWAL.filePath);
    +      Path parentWAL = walPath.getParent();
    +      if (parentWAL != null && fileContext.util().exists(parentWAL)) {
    +        RemoteIterator<FileStatus> remoteIterator = fileContext.listStatus(parentWAL);
    +        while (remoteIterator.hasNext()) {
    +          FileStatus status = remoteIterator.next();
    +          String fileName = status.getPath().getName();
    +          if (fileName.startsWith(walPath.getName()) && fileName.endsWith(TMP_EXTENSION) &&
    +              !validPathNames.contains(fileName)) {
    +            LOG.debug("delete stray tmp {}", status.getPath());
    +            fileContext.delete(status.getPath(), true);
    +          }
    +
    +        }
    +      }
    +
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (outputStream != null) {
    +        flush();
    +        outputStream.close();
    +        outputStream = null;
    +        LOG.debug("closed {}", currentPointer.partNum);
    +      }
    +    }
    +
    +    @Override
    +    public int append(T entry) throws IOException
    +    {
    +      byte[] slice = fileSystemWAL.serde.serialize(entry);
    --- End diff --
    
    Will make the change. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60678798
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,688 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +import org.apache.hadoop.fs.Syncable;
    +import org.apache.hadoop.fs.local.LocalFs;
    +import org.apache.hadoop.fs.local.RawLocalFs;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * A WAL implementation that is file based.
    + * <p/>
    + * Note:<br/>
    + * The FileSystem Writer and Reader operations should not alternate because intermingling these operations will cause
    + * problems. Typically the  WAL Reader will only used in recovery.<br/>
    + *
    + * Also this implementation is thread unsafe- the filesystem wal writer and reader operations should be performed in
    + * operator's thread.
    + */
    +public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  private FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
    +
    +  //part => tmp file path;
    +  private final Map<Integer, String> tempPartFiles = new TreeMap<>();
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALWriter.open(fileContext);
    +      fileSystemWALReader.open(fileContext);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    --- End diff --
    
    Why does it happen in committed. Shouldn't the file be renamed as soon as it is closed?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60258161
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALReader.open(fileContext);
    +      fileSystemWALWriter.open(fileContext);
    +
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    +  {
    +    try {
    +      fileSystemWALWriter.finalizeFiles(window);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during committed", e);
    +    }
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      fileSystemWALReader.close();
    +      fileSystemWALWriter.close();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during teardown", e);
    +    }
    +  }
    +
    +  protected long getLastCheckpointedWindow()
    +  {
    +    return lastCheckpointedWindow;
    +  }
    +
    +  protected String getPartFilePath(int partNumber)
    +  {
    +    return filePath + "_" + partNumber;
    +  }
    +
    +  @Override
    +  public FileSystemWALReader<T> getReader()
    +  {
    +    return fileSystemWALReader;
    +  }
    +
    +  /**
    +   * Sets the  File System WAL Reader. This can be used to override the default wal reader.
    +   *
    +   * @param fileSystemWALReader wal reader.
    +   */
    +  public void setFileSystemWALReader(@NotNull FileSystemWALReader<T> fileSystemWALReader)
    +  {
    +    this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
    +  }
    +
    +  @Override
    +  public FileSystemWALWriter<T> getWriter()
    +  {
    +    return fileSystemWALWriter;
    +  }
    +
    +  /**
    +   * Sets the File System WAL Writer. This can be used to override the default wal writer.
    +   * @param fileSystemWALWriter wal writer.
    +   */
    +  public void setFileSystemWALWriter(@NotNull FileSystemWALWriter<T> fileSystemWALWriter)
    +  {
    +    this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
    +  }
    +
    +  /**
    +   * @return WAL Entry serde
    +   */
    +  public Serde<T, byte[]> getSerde()
    +  {
    +    return serde;
    +  }
    +
    +  /**
    +   * Sets the serde which is used for wal entry serialization and de-serialization
    +   *
    +   * @param serde serializer/deserializer
    +   */
    +  public void setSerde(@NotNull Serde<T, byte[]> serde)
    +  {
    +    this.serde = Preconditions.checkNotNull(serde, "serde");
    +  }
    +
    +  /**
    +   * @return WAL file path
    +   */
    +  public String getFilePath()
    +  {
    +    return filePath;
    +  }
    +
    +  /**
    +   * Sets the WAL file path.
    +   *
    +   * @param filePath WAL file path
    +   */
    +  public void setFilePath(@NotNull String filePath)
    +  {
    +    this.filePath = Preconditions.checkNotNull(filePath, "filePath");
    +  }
    +
    +  /**
    +   * @return max length of a WAL part file.
    +   */
    +  public long getMaxLength()
    +  {
    +    return maxLength;
    +  }
    +
    +  /**
    +   * Sets the maximum length of a WAL part file.
    +   *
    +   * @param maxLength max length of the WAL part file
    +   */
    +  public void setMaxLength(long maxLength)
    +  {
    +    this.maxLength = maxLength;
    +  }
    +
    +  public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
    +  {
    +    private final int partNum;
    +    private long offset;
    +
    +    private FileSystemWALPointer()
    +    {
    +      //for kryo
    +      partNum = -1;
    +    }
    +
    +    public FileSystemWALPointer(long offset)
    +    {
    +      this(0, offset);
    +    }
    +
    +    public FileSystemWALPointer(int partNum, long offset)
    +    {
    +      this.partNum = partNum;
    +      this.offset = offset;
    +    }
    +
    +    @Override
    +    public int compareTo(@NotNull FileSystemWALPointer o)
    +    {
    +      if (this.partNum < o.partNum) {
    +        return -1;
    +      }
    +      if (this.partNum > o.partNum) {
    +        return 1;
    +      }
    +      if (this.offset < o.offset) {
    +        return -1;
    +      }
    +      if (this.offset > o.offset) {
    +        return 1;
    +      }
    +      return 0;
    +    }
    +
    +    public int getPartNum()
    +    {
    +      return partNum;
    +    }
    +
    +    public long getOffset()
    +    {
    +      return offset;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset + '}';
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem Wal Reader
    +   * @param <T> type of tuple.
    +   */
    +  public static class FileSystemWALReader<T> implements WAL.WALReader<T, FileSystemWALPointer>
    +  {
    +    private T entry;
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +
    +    private transient DataInputStream inputStream;
    +    private transient Path currentOpenPath;
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALReader()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALReader(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
    +      //initialize the input stream
    +      inputStream = getInputStream(currentPointer);
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +    }
    +
    +    @Override
    +    public void seek(FileSystemWALPointer pointer) throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +      }
    +      inputStream = getInputStream(pointer);
    +      Preconditions.checkNotNull(inputStream, "invalid pointer " + pointer);
    +      currentPointer = pointer;
    +    }
    +
    +    /**
    +     * Move to the next WAL segment.
    +     *
    +     * @return true if the next part file exists and is opened; false otherwise.
    +     * @throws IOException
    +     */
    +    private boolean nextSegment() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +
    +      currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
    +      inputStream = getInputStream(currentPointer);
    +
    +      return inputStream != null;
    +    }
    +
    +    private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException
    +    {
    +      Path walPartPath = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
    +      if (fileContext.util().exists(walPartPath)) {
    +        DataInputStream stream = fileContext.open(walPartPath);
    +        if (walPointer.offset > 0) {
    +          stream.skip(walPointer.offset);
    +        }
    +        currentOpenPath = walPartPath;
    +        return stream;
    +      }
    +      return null;
    +    }
    +
    +    @Override
    +    public boolean advance() throws IOException
    +    {
    +      do {
    +        if (inputStream == null) {
    +          inputStream = getInputStream(currentPointer);
    +        }
    +
    +        if (inputStream != null &&
    +            currentPointer.offset < fileContext.getFileStatus(currentOpenPath).getLen()) {
    +          int len = inputStream.readInt();
    +          Preconditions.checkState(len >= 0, "negative length");
    +
    +          byte[] data = new byte[len];
    +          inputStream.readFully(data);
    +
    +          entry = fileSystemWAL.serde.deserialize(data);
    +          currentPointer.offset += data.length + 4;
    +          return true;
    +        }
    +      } while (nextSegment());
    +
    +      entry = null;
    +      return false;
    +    }
    +
    +    @Override
    +    public T get()
    +    {
    +      return entry;
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem WAL Writer.
    +   * @param <T> type of tuple
    +   */
    +  public static class FileSystemWALWriter<T> implements WAL.WALWriter<T>
    +  {
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +    private transient DataOutputStream outputStream;
    +
    +    //windowId => Latest part which can be finalized.
    +    private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
    +
    +    //part => tmp file path;
    +    private final Map<Integer, String> tmpFiles = new TreeMap<>();
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALWriter()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALWriter(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "file context");
    +      recover();
    +      if (outputStream == null) {
    +        outputStream = getOutputStream(currentPointer);
    +      }
    +    }
    +
    +    private void recover() throws IOException
    +    {
    +      LOG.debug("current point", currentPointer);
    +      String tmpFilePath = tmpFiles.get(currentPointer.getPartNum());
    +      if (tmpFilePath != null) {
    +
    +        Path tmpPath = new Path(tmpFilePath);
    +        if (fileContext.util().exists(tmpPath)) {
    +          LOG.debug("tmp path exists {}", tmpPath);
    +
    +          outputStream = getOutputStream(currentPointer);
    +          DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
    +
    +          IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset, outputStream);
    +
    +          outputStream.flush();
    +          //remove old tmp
    +          inputStreamOldTmp.close();
    +          LOG.debug("delete tmp {}", tmpPath);
    +          fileContext.delete(tmpPath, true);
    +        }
    +      }
    +
    +      //find all valid path names
    +      Set<String> validPathNames = new HashSet<>();
    +      for (Map.Entry<Integer, String> entry : tmpFiles.entrySet()) {
    +        if (entry.getKey() <= currentPointer.partNum) {
    +          validPathNames.add(new Path(entry.getValue()).getName());
    +        }
    +      }
    +      LOG.debug("valid names {}", validPathNames);
    +
    +      //there can be a failure just between the flush and the actual checkpoint which can leave some stray tmp files
    +      //which aren't accounted by tmp files map
    +      Path walPath = new Path(fileSystemWAL.filePath);
    +      Path parentWAL = walPath.getParent();
    +      if (parentWAL != null && fileContext.util().exists(parentWAL)) {
    +        RemoteIterator<FileStatus> remoteIterator = fileContext.listStatus(parentWAL);
    +        while (remoteIterator.hasNext()) {
    +          FileStatus status = remoteIterator.next();
    +          String fileName = status.getPath().getName();
    +          if (fileName.startsWith(walPath.getName()) && fileName.endsWith(TMP_EXTENSION) &&
    +              !validPathNames.contains(fileName)) {
    +            LOG.debug("delete stray tmp {}", status.getPath());
    +            fileContext.delete(status.getPath(), true);
    +          }
    +
    +        }
    +      }
    +
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (outputStream != null) {
    +        flush();
    +        outputStream.close();
    +        outputStream = null;
    +        LOG.debug("closed {}", currentPointer.partNum);
    +      }
    +    }
    +
    +    @Override
    +    public int append(T entry) throws IOException
    +    {
    +      byte[] slice = fileSystemWAL.serde.serialize(entry);
    --- End diff --
    
    Is it good to mix in serialization at this level. This requires you to make a copy when you have Slice as source.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60352717
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,594 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.ConcurrentSkipListMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +import org.apache.hadoop.fs.Syncable;
    +import org.apache.hadoop.fs.local.LocalFs;
    +import org.apache.hadoop.fs.local.RawLocalFs;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.netlet.util.Slice;
    +
    +public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
    +
    +  //part => tmp file path;
    +  private final ConcurrentSkipListMap<Integer, String> tempPartFiles = new ConcurrentSkipListMap<>();
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALWriter.open(fileContext);
    +      fileSystemWALReader.open(fileContext);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    +  {
    +    try {
    +      fileSystemWALWriter.finalizeFiles(window);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during committed", e);
    +    }
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      fileSystemWALReader.close();
    +      fileSystemWALWriter.close();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during teardown", e);
    +    }
    +  }
    +
    +  protected long getLastCheckpointedWindow()
    +  {
    +    return lastCheckpointedWindow;
    +  }
    +
    +  protected String getPartFilePath(int partNumber)
    +  {
    +    return filePath + "_" + partNumber;
    +  }
    +
    +  @Override
    +  public FileSystemWALReader getReader()
    +  {
    +    return fileSystemWALReader;
    +  }
    +
    +  /**
    +   * Sets the  File System WAL Reader. This can be used to override the default wal reader.
    +   *
    +   * @param fileSystemWALReader wal reader.
    +   */
    +  public void setFileSystemWALReader(@NotNull FileSystemWALReader fileSystemWALReader)
    +  {
    +    this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
    +  }
    +
    +  @Override
    +  public FileSystemWALWriter getWriter()
    +  {
    +    return fileSystemWALWriter;
    +  }
    +
    +  /**
    +   * Sets the File System WAL Writer. This can be used to override the default wal writer.
    +   *
    +   * @param fileSystemWALWriter wal writer.
    +   */
    +  public void setFileSystemWALWriter(@NotNull FileSystemWALWriter fileSystemWALWriter)
    +  {
    +    this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
    +  }
    +
    +  /**
    +   * @return WAL file path
    +   */
    +  public String getFilePath()
    +  {
    +    return filePath;
    +  }
    +
    +  /**
    +   * Sets the WAL file path.
    +   *
    +   * @param filePath WAL file path
    +   */
    +  public void setFilePath(@NotNull String filePath)
    +  {
    +    this.filePath = Preconditions.checkNotNull(filePath, "filePath");
    +  }
    +
    +  /**
    +   * @return max length of a WAL part file.
    +   */
    +  public long getMaxLength()
    +  {
    +    return maxLength;
    +  }
    +
    +  /**
    +   * Sets the maximum length of a WAL part file.
    +   *
    +   * @param maxLength max length of the WAL part file
    +   */
    +  public void setMaxLength(long maxLength)
    +  {
    +    this.maxLength = maxLength;
    +  }
    +
    +  public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
    +  {
    +    private final int partNum;
    +    private long offset;
    +
    +    private FileSystemWALPointer()
    +    {
    +      //for kryo
    +      partNum = -1;
    +    }
    +
    +    public FileSystemWALPointer(long offset)
    +    {
    +      this(0, offset);
    +    }
    +
    +    public FileSystemWALPointer(int partNum, long offset)
    +    {
    +      this.partNum = partNum;
    +      this.offset = offset;
    +    }
    +
    +    @Override
    +    public int compareTo(@NotNull FileSystemWALPointer o)
    +    {
    +      if (this.partNum < o.partNum) {
    +        return -1;
    +      }
    +      if (this.partNum > o.partNum) {
    +        return 1;
    +      }
    +      if (this.offset < o.offset) {
    +        return -1;
    +      }
    +      if (this.offset > o.offset) {
    +        return 1;
    +      }
    +      return 0;
    +    }
    +
    +    public int getPartNum()
    +    {
    +      return partNum;
    +    }
    +
    +    public long getOffset()
    +    {
    +      return offset;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset + '}';
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem Wal Reader
    +   */
    +  public static class FileSystemWALReader implements WAL.WALReader<FileSystemWALPointer>
    +  {
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +
    +    private transient DataInputStream inputStream;
    +    private transient Path currentOpenPath;
    +
    +    private final FileSystemWAL fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALReader()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALReader(@NotNull FileSystemWAL fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +    }
    +
    +    @Override
    +    public void seek(FileSystemWALPointer pointer) throws IOException
    +    {
    +      if (inputStream != null) {
    +        close();
    +      }
    +      inputStream = getInputStream(pointer);
    +      Preconditions.checkNotNull(inputStream, "invalid pointer " + pointer);
    +      currentPointer = pointer;
    +    }
    +
    +    /**
    +     * Move to the next WAL segment.
    +     *
    +     * @return true if the next part file exists and is opened; false otherwise.
    +     * @throws IOException
    +     */
    +    private boolean nextSegment() throws IOException
    +    {
    +      if (inputStream != null) {
    +        close();
    +      }
    +
    +      currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
    +      inputStream = getInputStream(currentPointer);
    +
    +      return inputStream != null;
    +    }
    +
    +    private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException
    +    {
    +      Preconditions.checkArgument(inputStream == null, "input stream not null");
    +      Path pathToReadFrom;
    +      String tmpPath = fileSystemWAL.tempPartFiles.get(walPointer.getPartNum());
    +      if (tmpPath != null) {
    +        pathToReadFrom = new Path(tmpPath);
    +      } else {
    +        pathToReadFrom = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
    +      }
    +
    +      LOG.debug("path to read {} and pointer {}", pathToReadFrom, walPointer);
    +      if (fileContext.util().exists(pathToReadFrom)) {
    +        DataInputStream stream = fileContext.open(pathToReadFrom);
    +        if (walPointer.offset > 0) {
    +          stream.skip(walPointer.offset);
    +        }
    +        currentOpenPath = pathToReadFrom;
    +        return stream;
    +      }
    +      return null;
    +    }
    +
    +    @Override
    +    public Slice next() throws IOException
    +    {
    +      do {
    +        if (inputStream == null) {
    +          inputStream = getInputStream(currentPointer);
    +        }
    +
    +        if (inputStream != null && !fileContext.util().exists(currentOpenPath)) {
    --- End diff --
    
    This is very expensive. Why not invalidate the reader when the file rename occurs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60255922
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/IOUtils.java ---
    @@ -0,0 +1,56 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.utils;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +
    +public class IOUtils
    +{
    +  /**
    +   * Utility method to copy partial data from input stream to output stream.
    +   * @param inputStream        input stream
    +   * @param inputStreamOffset  offset in the input stream till which the data is copied.
    +   * @param outputStream       output stream
    +   * @throws IOException
    +   */
    +  public static void copyPartial(InputStream inputStream, long inputStreamOffset, OutputStream outputStream)
    --- End diff --
    
    Yes, only the newer version has the support to specify a range.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [ONLY FOR REVIEW] APEXMALHAR 1...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r59826248
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java ---
    @@ -0,0 +1,100 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.IOException;
    +
    +/**
    + * This interface represents a write ahead log that can be used by operator.
    + * the WAL is split into two interfaces, a WALWriter which allows writing
    + * data, and WALReader which provides iterator like interface to read entries
    + * written to the WAL.
    + *
    + * @param <READER> Type of WAL Reader
    + * @param <WRITER> WAL Pointer Type.
    + */
    +public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
    +{
    +  void setup();
    +
    +  void teardown();
    +
    +  void beforeCheckpoint(long window);
    +
    +  void committed(long window);
    +
    +  READER getReader() throws IOException;
    +
    +  WRITER getWriter() throws IOException;
    +
    +  /**
    +   * Provides iterator like interface to read entries from the WAL.
    +   * @param <T> type of WAL entries
    +   * @param <P> type of Pointer in the WAL
    +   */
    +  interface WALReader<T, P>
    +  {
    +    /**
    +     * Seek to middle of the WAL. This is used primarily during recovery,
    +     * when we need to start recovering data from middle of WAL file.
    +     */
    +    void seek(P pointer) throws IOException;
    +
    +    /**
    +     * Advance WAL by one entry, returns true if it can advance, else false
    +     * in case of any other error throws an Exception.
    +     *
    +     * @return true if next data item is read successfully, false if data can not be read.
    +     * @throws IOException
    +     */
    +    boolean advance() throws IOException;
    +
    +    /**
    +     * Return current entry from WAL, returns null if end of file has reached.
    +     *
    +     * @return MutableKeyValue
    +     */
    +    T get();
    +  }
    +
    +  /**
    +   * Provide method to write entries to the WAL.
    +   * @param <T>
    +   * @param <P>
    +   */
    +  interface WALWriter<T, P>
    --- End diff --
    
    No. will remove unless someone comes up with a suggestion


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60686903
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,688 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +import org.apache.hadoop.fs.Syncable;
    +import org.apache.hadoop.fs.local.LocalFs;
    +import org.apache.hadoop.fs.local.RawLocalFs;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * A WAL implementation that is file based.
    + * <p/>
    + * Note:<br/>
    + * The FileSystem Writer and Reader operations should not alternate because intermingling these operations will cause
    + * problems. Typically the  WAL Reader will only used in recovery.<br/>
    + *
    + * Also this implementation is thread unsafe- the filesystem wal writer and reader operations should be performed in
    + * operator's thread.
    + */
    +public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  private FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
    +
    +  //part => tmp file path;
    +  private final Map<Integer, String> tempPartFiles = new TreeMap<>();
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALWriter.open(fileContext);
    +      fileSystemWALReader.open(fileContext);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    --- End diff --
    
    No, the HDFS issue doesn't apply here (file is closed and not under construction). Looks good, will merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60681584
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,688 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +import org.apache.hadoop.fs.Syncable;
    +import org.apache.hadoop.fs.local.LocalFs;
    +import org.apache.hadoop.fs.local.RawLocalFs;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * A WAL implementation that is file based.
    + * <p/>
    + * Note:<br/>
    + * The FileSystem Writer and Reader operations should not alternate because intermingling these operations will cause
    + * problems. Typically the  WAL Reader will only used in recovery.<br/>
    + *
    + * Also this implementation is thread unsafe- the filesystem wal writer and reader operations should be performed in
    + * operator's thread.
    + */
    +public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  private FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
    +
    +  //part => tmp file path;
    +  private final Map<Integer, String> tempPartFiles = new TreeMap<>();
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALWriter.open(fileContext);
    +      fileSystemWALReader.open(fileContext);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    --- End diff --
    
    Got it. FYI this isn't releated to the dangling lease issue, which occurs only when the file were left open.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60275783
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.IOException;
    +
    +/**
    + * This interface represents a write ahead log that can be used by operator.
    + * the WAL is split into two interfaces, a WALWriter which allows writing
    + * data, and WALReader which provides iterator like interface to read entries
    + * written to the WAL.
    + *
    + * @param <READER> Type of WAL Reader
    + * @param <WRITER> WAL Pointer Type.
    + */
    +public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
    +{
    +  void setup();
    +
    +  void teardown();
    +
    +  void beforeCheckpoint(long window);
    +
    +  void committed(long window);
    +
    +  READER getReader();
    +
    +  WRITER getWriter();
    +
    +  /**
    +   * Provides iterator like interface to read entries from the WAL.
    +   * @param <T> type of WAL entries
    +   * @param <P> type of Pointer in the WAL
    +   */
    +  interface WALReader<T, P>
    +  {
    +    /**
    +     * Seek to middle of the WAL. This is used primarily during recovery,
    +     * when we need to start recovering data from middle of WAL file.
    +     */
    +    void seek(P pointer) throws IOException;
    +
    +    /**
    +     * Advance WAL by one entry, returns true if it can advance, else false
    --- End diff --
    
    I think that will be better. Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [ONLY FOR REVIEW] APEXMALHAR 1...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r59827085
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java ---
    @@ -0,0 +1,100 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.IOException;
    +
    +/**
    + * This interface represents a write ahead log that can be used by operator.
    + * the WAL is split into two interfaces, a WALWriter which allows writing
    + * data, and WALReader which provides iterator like interface to read entries
    + * written to the WAL.
    + *
    + * @param <READER> Type of WAL Reader
    + * @param <WRITER> WAL Pointer Type.
    + */
    +public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
    +{
    +  void setup();
    +
    +  void teardown();
    +
    +  void beforeCheckpoint(long window);
    +
    +  void committed(long window);
    +
    +  READER getReader() throws IOException;
    +
    +  WRITER getWriter() throws IOException;
    +
    +  /**
    +   * Provides iterator like interface to read entries from the WAL.
    +   * @param <T> type of WAL entries
    +   * @param <P> type of Pointer in the WAL
    +   */
    +  interface WALReader<T, P>
    +  {
    +    /**
    +     * Seek to middle of the WAL. This is used primarily during recovery,
    +     * when we need to start recovering data from middle of WAL file.
    +     */
    +    void seek(P pointer) throws IOException;
    +
    +    /**
    +     * Advance WAL by one entry, returns true if it can advance, else false
    +     * in case of any other error throws an Exception.
    +     *
    +     * @return true if next data item is read successfully, false if data can not be read.
    +     * @throws IOException
    +     */
    +    boolean advance() throws IOException;
    +
    +    /**
    +     * Return current entry from WAL, returns null if end of file has reached.
    +     *
    +     * @return MutableKeyValue
    +     */
    +    T get();
    +  }
    +
    +  /**
    +   * Provide method to write entries to the WAL.
    +   * @param <T>
    +   * @param <P>
    +   */
    +  interface WALWriter<T, P>
    +  {
    +    /**
    +     * Write an entry to the WAL
    +     */
    +    int append(T entry) throws IOException;
    +
    +  }
    +
    +  /**
    +   * Serializer interface used while reading and writing entries to the WAL.
    +   * @param <T>
    +   */
    +  interface Serde<T>
    --- End diff --
    
    There is already a Serde interface here https://github.com/apache/incubator-apex-malhar/blob/master/library/src/main/java/org/apache/apex/malhar/lib/spillable/Serde.java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60352191
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,594 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.ConcurrentSkipListMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +import org.apache.hadoop.fs.Syncable;
    +import org.apache.hadoop.fs.local.LocalFs;
    +import org.apache.hadoop.fs.local.RawLocalFs;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.netlet.util.Slice;
    +
    +public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
    +
    +  //part => tmp file path;
    +  private final ConcurrentSkipListMap<Integer, String> tempPartFiles = new ConcurrentSkipListMap<>();
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALWriter.open(fileContext);
    +      fileSystemWALReader.open(fileContext);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    +  {
    +    try {
    +      fileSystemWALWriter.finalizeFiles(window);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during committed", e);
    +    }
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      fileSystemWALReader.close();
    +      fileSystemWALWriter.close();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during teardown", e);
    +    }
    +  }
    +
    +  protected long getLastCheckpointedWindow()
    +  {
    +    return lastCheckpointedWindow;
    +  }
    +
    +  protected String getPartFilePath(int partNumber)
    +  {
    +    return filePath + "_" + partNumber;
    +  }
    +
    +  @Override
    +  public FileSystemWALReader getReader()
    +  {
    +    return fileSystemWALReader;
    +  }
    +
    +  /**
    +   * Sets the  File System WAL Reader. This can be used to override the default wal reader.
    +   *
    +   * @param fileSystemWALReader wal reader.
    +   */
    +  public void setFileSystemWALReader(@NotNull FileSystemWALReader fileSystemWALReader)
    +  {
    +    this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
    +  }
    +
    +  @Override
    +  public FileSystemWALWriter getWriter()
    +  {
    +    return fileSystemWALWriter;
    +  }
    +
    +  /**
    +   * Sets the File System WAL Writer. This can be used to override the default wal writer.
    +   *
    +   * @param fileSystemWALWriter wal writer.
    +   */
    +  public void setFileSystemWALWriter(@NotNull FileSystemWALWriter fileSystemWALWriter)
    +  {
    +    this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
    +  }
    +
    +  /**
    +   * @return WAL file path
    +   */
    +  public String getFilePath()
    +  {
    +    return filePath;
    +  }
    +
    +  /**
    +   * Sets the WAL file path.
    +   *
    +   * @param filePath WAL file path
    +   */
    +  public void setFilePath(@NotNull String filePath)
    +  {
    +    this.filePath = Preconditions.checkNotNull(filePath, "filePath");
    +  }
    +
    +  /**
    +   * @return max length of a WAL part file.
    +   */
    +  public long getMaxLength()
    +  {
    +    return maxLength;
    +  }
    +
    +  /**
    +   * Sets the maximum length of a WAL part file.
    +   *
    +   * @param maxLength max length of the WAL part file
    +   */
    +  public void setMaxLength(long maxLength)
    +  {
    +    this.maxLength = maxLength;
    +  }
    +
    +  public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
    +  {
    +    private final int partNum;
    +    private long offset;
    +
    +    private FileSystemWALPointer()
    +    {
    +      //for kryo
    +      partNum = -1;
    +    }
    +
    +    public FileSystemWALPointer(long offset)
    +    {
    +      this(0, offset);
    +    }
    +
    +    public FileSystemWALPointer(int partNum, long offset)
    +    {
    +      this.partNum = partNum;
    +      this.offset = offset;
    +    }
    +
    +    @Override
    +    public int compareTo(@NotNull FileSystemWALPointer o)
    +    {
    +      if (this.partNum < o.partNum) {
    +        return -1;
    +      }
    +      if (this.partNum > o.partNum) {
    +        return 1;
    +      }
    +      if (this.offset < o.offset) {
    +        return -1;
    +      }
    +      if (this.offset > o.offset) {
    +        return 1;
    +      }
    +      return 0;
    +    }
    +
    +    public int getPartNum()
    +    {
    +      return partNum;
    +    }
    +
    +    public long getOffset()
    +    {
    +      return offset;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset + '}';
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem Wal Reader
    +   */
    +  public static class FileSystemWALReader implements WAL.WALReader<FileSystemWALPointer>
    +  {
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +
    +    private transient DataInputStream inputStream;
    +    private transient Path currentOpenPath;
    +
    +    private final FileSystemWAL fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALReader()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALReader(@NotNull FileSystemWAL fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +    }
    +
    +    @Override
    +    public void seek(FileSystemWALPointer pointer) throws IOException
    +    {
    +      if (inputStream != null) {
    +        close();
    +      }
    +      inputStream = getInputStream(pointer);
    +      Preconditions.checkNotNull(inputStream, "invalid pointer " + pointer);
    +      currentPointer = pointer;
    +    }
    +
    +    /**
    +     * Move to the next WAL segment.
    +     *
    +     * @return true if the next part file exists and is opened; false otherwise.
    +     * @throws IOException
    +     */
    +    private boolean nextSegment() throws IOException
    +    {
    +      if (inputStream != null) {
    +        close();
    +      }
    +
    +      currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
    +      inputStream = getInputStream(currentPointer);
    +
    +      return inputStream != null;
    +    }
    +
    +    private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException
    +    {
    +      Preconditions.checkArgument(inputStream == null, "input stream not null");
    +      Path pathToReadFrom;
    +      String tmpPath = fileSystemWAL.tempPartFiles.get(walPointer.getPartNum());
    +      if (tmpPath != null) {
    +        pathToReadFrom = new Path(tmpPath);
    +      } else {
    +        pathToReadFrom = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
    +      }
    +
    +      LOG.debug("path to read {} and pointer {}", pathToReadFrom, walPointer);
    +      if (fileContext.util().exists(pathToReadFrom)) {
    +        DataInputStream stream = fileContext.open(pathToReadFrom);
    +        if (walPointer.offset > 0) {
    +          stream.skip(walPointer.offset);
    +        }
    +        currentOpenPath = pathToReadFrom;
    +        return stream;
    +      }
    +      return null;
    +    }
    +
    +    @Override
    +    public Slice next() throws IOException
    +    {
    +      do {
    +        if (inputStream == null) {
    +          inputStream = getInputStream(currentPointer);
    +        }
    +
    +        if (inputStream != null && !fileContext.util().exists(currentOpenPath)) {
    +          //if the tmp path was finalized the path may not exist any more
    +          close();
    +          inputStream = getInputStream(currentPointer);
    +        }
    +
    +        if (inputStream != null && currentPointer.offset < fileContext.getFileStatus(currentOpenPath).getLen()) {
    +          int len = inputStream.readInt();
    +          Preconditions.checkState(len >= 0, "negative length");
    +
    +          byte[] data = new byte[len];
    +          inputStream.readFully(data);
    +
    +          currentPointer.offset += data.length + 4;
    +          return new Slice(data);
    +        }
    +      } while (nextSegment());
    +
    +      return null;
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem WAL Writer.
    +   */
    +  public static class FileSystemWALWriter implements WAL.WALWriter
    +  {
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +    private transient DataOutputStream outputStream;
    +
    +    //windowId => Latest part which can be finalized.
    +    private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
    +
    +    private final FileSystemWAL fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALWriter()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALWriter(@NotNull FileSystemWAL fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "file context");
    +      recover();
    +    }
    +
    +    private void recover() throws IOException
    +    {
    +      LOG.debug("current point", currentPointer);
    +      String tmpFilePath = fileSystemWAL.tempPartFiles.get(currentPointer.getPartNum());
    +      if (tmpFilePath != null) {
    +
    +        Path tmpPath = new Path(tmpFilePath);
    +        if (fileContext.util().exists(tmpPath)) {
    +          LOG.debug("tmp path exists {}", tmpPath);
    +
    +          outputStream = getOutputStream(new FileSystemWALPointer(currentPointer.partNum, 0));
    +          DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
    +
    +          IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset, outputStream);
    +
    +          outputStream.flush();
    +          //remove old tmp
    +          inputStreamOldTmp.close();
    +          LOG.debug("delete tmp {}", tmpPath);
    +          fileContext.delete(tmpPath, true);
    +        }
    +      }
    +
    +      //find all valid path names
    +      Set<String> validPathNames = new HashSet<>();
    +      for (Map.Entry<Integer, String> entry : fileSystemWAL.tempPartFiles.entrySet()) {
    +        if (entry.getKey() <= currentPointer.partNum) {
    +          validPathNames.add(new Path(entry.getValue()).getName());
    +        }
    +      }
    +      LOG.debug("valid names {}", validPathNames);
    +
    +      //there can be a failure just between the flush and the actual checkpoint which can leave some stray tmp files
    +      //which aren't accounted by tmp files map
    +      Path walPath = new Path(fileSystemWAL.filePath);
    +      Path parentWAL = walPath.getParent();
    +      if (parentWAL != null && fileContext.util().exists(parentWAL)) {
    +        RemoteIterator<FileStatus> remoteIterator = fileContext.listStatus(parentWAL);
    +        while (remoteIterator.hasNext()) {
    +          FileStatus status = remoteIterator.next();
    +          String fileName = status.getPath().getName();
    +          if (fileName.startsWith(walPath.getName()) && fileName.endsWith(TMP_EXTENSION) &&
    +              !validPathNames.contains(fileName)) {
    +            LOG.debug("delete stray tmp {}", status.getPath());
    +            fileContext.delete(status.getPath(), true);
    +          }
    +
    +        }
    +      }
    +
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (outputStream != null) {
    +        outputStream.close();
    +        outputStream = null;
    +        LOG.debug("closed {}", currentPointer.partNum);
    +      }
    +    }
    +
    +    @Override
    +    public int append(Slice entry) throws IOException
    +    {
    +      if (outputStream == null) {
    +        outputStream = getOutputStream(currentPointer);
    +      }
    +
    +      int entryLength = entry.length + 4;
    +
    +      // rotate if needed
    +      if (shouldRotate(entryLength)) {
    +        rotate(true);
    +      }
    +
    +      outputStream.writeInt(entry.length);
    +      outputStream.write(entry.toByteArray());
    --- End diff --
    
    Should use write(byte b[], int off, int len) instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60151482
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/IOUtils.java ---
    @@ -0,0 +1,56 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.utils;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +
    +public class IOUtils
    +{
    +  /**
    +   * Utility method to copy partial data from input stream to output stream.
    +   * @param inputStream        input stream
    +   * @param inputStreamOffset  offset in the input stream till which the data is copied.
    +   * @param outputStream       output stream
    +   * @throws IOException
    +   */
    +  public static void copyPartial(InputStream inputStream, long inputStreamOffset, OutputStream outputStream)
    --- End diff --
    
    org.apache.commons.io.IOUtils.copyLarge won't work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60680461
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,688 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +import org.apache.hadoop.fs.Syncable;
    +import org.apache.hadoop.fs.local.LocalFs;
    +import org.apache.hadoop.fs.local.RawLocalFs;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * A WAL implementation that is file based.
    + * <p/>
    + * Note:<br/>
    + * The FileSystem Writer and Reader operations should not alternate because intermingling these operations will cause
    + * problems. Typically the  WAL Reader will only used in recovery.<br/>
    + *
    + * Also this implementation is thread unsafe- the filesystem wal writer and reader operations should be performed in
    + * operator's thread.
    + */
    +public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  private FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
    +
    +  //part => tmp file path;
    +  private final Map<Integer, String> tempPartFiles = new TreeMap<>();
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALWriter.open(fileContext);
    +      fileSystemWALReader.open(fileContext);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    --- End diff --
    
    This is because after a failure when the operator is restored to a checkpoint it might expect to write to a tmp file which will not be found if it was renamed just after it was closed.
    
    For eg. the operator starts to write to WAL_x_tmp in window 59. It gets check-pointed.
    In window 60 it completes writing to WAL_x_tmp  and so it closes it. If we rename it write then and there is a failure then the operator when gets restored to window 59 state will expect WAL_x_tmp.
    
    The above could be handled programmatically  but we avoided writing to the final target file unless we were absolutely sure it will not not be open again which was because HDFS had a bug where rename operation could leave a dangling lease on the target.
    
     This is how it was done in AbstractFileOutputOperator as well. Finalizing of files is deferred to committed because until then we can write to the same file.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60288368
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALReader.open(fileContext);
    +      fileSystemWALWriter.open(fileContext);
    +
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    +  {
    +    try {
    +      fileSystemWALWriter.finalizeFiles(window);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during committed", e);
    +    }
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      fileSystemWALReader.close();
    +      fileSystemWALWriter.close();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during teardown", e);
    +    }
    +  }
    +
    +  protected long getLastCheckpointedWindow()
    +  {
    +    return lastCheckpointedWindow;
    +  }
    +
    +  protected String getPartFilePath(int partNumber)
    +  {
    +    return filePath + "_" + partNumber;
    +  }
    +
    +  @Override
    +  public FileSystemWALReader<T> getReader()
    +  {
    +    return fileSystemWALReader;
    +  }
    +
    +  /**
    +   * Sets the  File System WAL Reader. This can be used to override the default wal reader.
    +   *
    +   * @param fileSystemWALReader wal reader.
    +   */
    +  public void setFileSystemWALReader(@NotNull FileSystemWALReader<T> fileSystemWALReader)
    +  {
    +    this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
    +  }
    +
    +  @Override
    +  public FileSystemWALWriter<T> getWriter()
    +  {
    +    return fileSystemWALWriter;
    +  }
    +
    +  /**
    +   * Sets the File System WAL Writer. This can be used to override the default wal writer.
    +   * @param fileSystemWALWriter wal writer.
    +   */
    +  public void setFileSystemWALWriter(@NotNull FileSystemWALWriter<T> fileSystemWALWriter)
    +  {
    +    this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
    +  }
    +
    +  /**
    +   * @return WAL Entry serde
    +   */
    +  public Serde<T, byte[]> getSerde()
    +  {
    +    return serde;
    +  }
    +
    +  /**
    +   * Sets the serde which is used for wal entry serialization and de-serialization
    +   *
    +   * @param serde serializer/deserializer
    +   */
    +  public void setSerde(@NotNull Serde<T, byte[]> serde)
    +  {
    +    this.serde = Preconditions.checkNotNull(serde, "serde");
    +  }
    +
    +  /**
    +   * @return WAL file path
    +   */
    +  public String getFilePath()
    +  {
    +    return filePath;
    +  }
    +
    +  /**
    +   * Sets the WAL file path.
    +   *
    +   * @param filePath WAL file path
    +   */
    +  public void setFilePath(@NotNull String filePath)
    +  {
    +    this.filePath = Preconditions.checkNotNull(filePath, "filePath");
    +  }
    +
    +  /**
    +   * @return max length of a WAL part file.
    +   */
    +  public long getMaxLength()
    +  {
    +    return maxLength;
    +  }
    +
    +  /**
    +   * Sets the maximum length of a WAL part file.
    +   *
    +   * @param maxLength max length of the WAL part file
    +   */
    +  public void setMaxLength(long maxLength)
    +  {
    +    this.maxLength = maxLength;
    +  }
    +
    +  public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
    +  {
    +    private final int partNum;
    +    private long offset;
    +
    +    private FileSystemWALPointer()
    +    {
    +      //for kryo
    +      partNum = -1;
    +    }
    +
    +    public FileSystemWALPointer(long offset)
    +    {
    +      this(0, offset);
    +    }
    +
    +    public FileSystemWALPointer(int partNum, long offset)
    +    {
    +      this.partNum = partNum;
    +      this.offset = offset;
    +    }
    +
    +    @Override
    +    public int compareTo(@NotNull FileSystemWALPointer o)
    +    {
    +      if (this.partNum < o.partNum) {
    +        return -1;
    +      }
    +      if (this.partNum > o.partNum) {
    +        return 1;
    +      }
    +      if (this.offset < o.offset) {
    +        return -1;
    +      }
    +      if (this.offset > o.offset) {
    +        return 1;
    +      }
    +      return 0;
    +    }
    +
    +    public int getPartNum()
    +    {
    +      return partNum;
    +    }
    +
    +    public long getOffset()
    +    {
    +      return offset;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset + '}';
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem Wal Reader
    +   * @param <T> type of tuple.
    +   */
    +  public static class FileSystemWALReader<T> implements WAL.WALReader<T, FileSystemWALPointer>
    +  {
    +    private T entry;
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +
    +    private transient DataInputStream inputStream;
    +    private transient Path currentOpenPath;
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALReader()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALReader(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
    +      //initialize the input stream
    +      inputStream = getInputStream(currentPointer);
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +    }
    +
    +    @Override
    +    public void seek(FileSystemWALPointer pointer) throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +      }
    +      inputStream = getInputStream(pointer);
    +      Preconditions.checkNotNull(inputStream, "invalid pointer " + pointer);
    +      currentPointer = pointer;
    +    }
    +
    +    /**
    +     * Move to the next WAL segment.
    +     *
    +     * @return true if the next part file exists and is opened; false otherwise.
    +     * @throws IOException
    +     */
    +    private boolean nextSegment() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +
    +      currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
    +      inputStream = getInputStream(currentPointer);
    +
    +      return inputStream != null;
    +    }
    +
    +    private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException
    +    {
    +      Path walPartPath = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
    +      if (fileContext.util().exists(walPartPath)) {
    +        DataInputStream stream = fileContext.open(walPartPath);
    +        if (walPointer.offset > 0) {
    +          stream.skip(walPointer.offset);
    +        }
    +        currentOpenPath = walPartPath;
    +        return stream;
    +      }
    +      return null;
    +    }
    +
    +    @Override
    +    public boolean advance() throws IOException
    +    {
    +      do {
    +        if (inputStream == null) {
    +          inputStream = getInputStream(currentPointer);
    +        }
    +
    +        if (inputStream != null &&
    +            currentPointer.offset < fileContext.getFileStatus(currentOpenPath).getLen()) {
    +          int len = inputStream.readInt();
    +          Preconditions.checkState(len >= 0, "negative length");
    +
    +          byte[] data = new byte[len];
    +          inputStream.readFully(data);
    +
    +          entry = fileSystemWAL.serde.deserialize(data);
    +          currentPointer.offset += data.length + 4;
    +          return true;
    +        }
    +      } while (nextSegment());
    +
    +      entry = null;
    +      return false;
    +    }
    +
    +    @Override
    +    public T get()
    +    {
    +      return entry;
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem WAL Writer.
    +   * @param <T> type of tuple
    +   */
    +  public static class FileSystemWALWriter<T> implements WAL.WALWriter<T>
    +  {
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +    private transient DataOutputStream outputStream;
    +
    +    //windowId => Latest part which can be finalized.
    +    private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
    +
    +    //part => tmp file path;
    +    private final Map<Integer, String> tmpFiles = new TreeMap<>();
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALWriter()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALWriter(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "file context");
    +      recover();
    +      if (outputStream == null) {
    +        outputStream = getOutputStream(currentPointer);
    +      }
    +    }
    +
    +    private void recover() throws IOException
    +    {
    +      LOG.debug("current point", currentPointer);
    +      String tmpFilePath = tmpFiles.get(currentPointer.getPartNum());
    +      if (tmpFilePath != null) {
    +
    +        Path tmpPath = new Path(tmpFilePath);
    +        if (fileContext.util().exists(tmpPath)) {
    +          LOG.debug("tmp path exists {}", tmpPath);
    +
    +          outputStream = getOutputStream(currentPointer);
    +          DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
    +
    +          IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset, outputStream);
    +
    +          outputStream.flush();
    +          //remove old tmp
    +          inputStreamOldTmp.close();
    +          LOG.debug("delete tmp {}", tmpPath);
    +          fileContext.delete(tmpPath, true);
    +        }
    +      }
    +
    +      //find all valid path names
    +      Set<String> validPathNames = new HashSet<>();
    +      for (Map.Entry<Integer, String> entry : tmpFiles.entrySet()) {
    +        if (entry.getKey() <= currentPointer.partNum) {
    +          validPathNames.add(new Path(entry.getValue()).getName());
    +        }
    +      }
    +      LOG.debug("valid names {}", validPathNames);
    +
    +      //there can be a failure just between the flush and the actual checkpoint which can leave some stray tmp files
    +      //which aren't accounted by tmp files map
    +      Path walPath = new Path(fileSystemWAL.filePath);
    +      Path parentWAL = walPath.getParent();
    +      if (parentWAL != null && fileContext.util().exists(parentWAL)) {
    +        RemoteIterator<FileStatus> remoteIterator = fileContext.listStatus(parentWAL);
    +        while (remoteIterator.hasNext()) {
    +          FileStatus status = remoteIterator.next();
    +          String fileName = status.getPath().getName();
    +          if (fileName.startsWith(walPath.getName()) && fileName.endsWith(TMP_EXTENSION) &&
    +              !validPathNames.contains(fileName)) {
    +            LOG.debug("delete stray tmp {}", status.getPath());
    +            fileContext.delete(status.getPath(), true);
    +          }
    +
    +        }
    +      }
    +
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (outputStream != null) {
    +        flush();
    +        outputStream.close();
    +        outputStream = null;
    +        LOG.debug("closed {}", currentPointer.partNum);
    +      }
    +    }
    +
    +    @Override
    +    public int append(T entry) throws IOException
    +    {
    +      byte[] slice = fileSystemWAL.serde.serialize(entry);
    +      int entryLength = slice.length + 4;
    +
    +      // rotate if needed
    +      if (shouldRotate(entryLength)) {
    +        rotate(true);
    +      }
    +
    +      outputStream.writeInt(slice.length);
    +      outputStream.write(slice);
    +      currentPointer.offset += entryLength;
    +
    +      if (currentPointer.offset == fileSystemWAL.maxLength) {
    +        //if the file is completed then we can rotate it. do not have to wait for next entry
    +        rotate(false);
    +      }
    +
    +      return entryLength;
    +    }
    +
    +    protected void flush() throws IOException
    +    {
    +      if (outputStream != null) {
    +        outputStream.flush();
    +        if (outputStream instanceof FSDataOutputStream) {
    +          ((FSDataOutputStream)outputStream).hflush();
    --- End diff --
    
    We do not need both ```outputStream.flush()``` and ```outputStream.hfush()```.
    I will change this to 
    ```
        protected void flush() throws IOException
        {
          if (outputStream != null) {
            if (outputStream instanceof Syncable) {
              Syncable syncableOutputStream = (Syncable)outputStream;
              syncableOutputStream.hflush();
              syncableOutputStream.hsync();
            } else {
              outputStream.flush();
            }
          }
        }
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [ONLY FOR REVIEW] APEXMALHAR 1...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r59826483
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java ---
    @@ -0,0 +1,100 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.IOException;
    +
    +/**
    + * This interface represents a write ahead log that can be used by operator.
    + * the WAL is split into two interfaces, a WALWriter which allows writing
    + * data, and WALReader which provides iterator like interface to read entries
    + * written to the WAL.
    + *
    + * @param <READER> Type of WAL Reader
    + * @param <WRITER> WAL Pointer Type.
    + */
    +public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
    +{
    +  void setup();
    +
    +  void teardown();
    +
    +  void beforeCheckpoint(long window);
    +
    +  void committed(long window);
    +
    +  READER getReader() throws IOException;
    +
    +  WRITER getWriter() throws IOException;
    +
    +  /**
    +   * Provides iterator like interface to read entries from the WAL.
    +   * @param <T> type of WAL entries
    +   * @param <P> type of Pointer in the WAL
    +   */
    +  interface WALReader<T, P>
    +  {
    +    /**
    +     * Seek to middle of the WAL. This is used primarily during recovery,
    +     * when we need to start recovering data from middle of WAL file.
    +     */
    +    void seek(P pointer) throws IOException;
    +
    +    /**
    +     * Advance WAL by one entry, returns true if it can advance, else false
    +     * in case of any other error throws an Exception.
    +     *
    +     * @return true if next data item is read successfully, false if data can not be read.
    +     * @throws IOException
    +     */
    +    boolean advance() throws IOException;
    +
    +    /**
    +     * Return current entry from WAL, returns null if end of file has reached.
    +     *
    +     * @return MutableKeyValue
    +     */
    +    T get();
    +  }
    +
    +  /**
    +   * Provide method to write entries to the WAL.
    +   * @param <T>
    +   * @param <P>
    +   */
    +  interface WALWriter<T, P>
    +  {
    +    /**
    +     * Write an entry to the WAL
    +     */
    +    int append(T entry) throws IOException;
    --- End diff --
    
    For a Write ahead log,  IMO appending an entry at a time will be most common. However bulk append support can be added later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60257017
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.IOException;
    +
    +/**
    + * This interface represents a write ahead log that can be used by operator.
    + * the WAL is split into two interfaces, a WALWriter which allows writing
    + * data, and WALReader which provides iterator like interface to read entries
    + * written to the WAL.
    + *
    + * @param <READER> Type of WAL Reader
    + * @param <WRITER> WAL Pointer Type.
    + */
    +public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
    +{
    +  void setup();
    +
    +  void teardown();
    +
    +  void beforeCheckpoint(long window);
    +
    +  void committed(long window);
    +
    +  READER getReader();
    +
    +  WRITER getWriter();
    +
    +  /**
    +   * Provides iterator like interface to read entries from the WAL.
    +   * @param <T> type of WAL entries
    +   * @param <P> type of Pointer in the WAL
    +   */
    +  interface WALReader<T, P>
    +  {
    +    /**
    +     * Seek to middle of the WAL. This is used primarily during recovery,
    +     * when we need to start recovering data from middle of WAL file.
    +     */
    +    void seek(P pointer) throws IOException;
    +
    +    /**
    +     * Advance WAL by one entry, returns true if it can advance, else false
    --- End diff --
    
    Why separate advance and get?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60456872
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    --- End diff --
    
    @tweise But does it make sense to have a maxLength not a multiple of the FS block size?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60334158
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    --- End diff --
    
    From my understanding, a file in HDFS always takes a multiple of the block size, even if the content of the file is just 1 byte, which is the main reason why HDFS is not suitable for many small files. If that's true, how about instead of maxLength, we let the user specify the multiple of file system's block size?
    
    http://blog.cloudera.com/blog/2009/02/the-small-files-problem/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/242


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: [ONLY FOR REVIEW] APEXMALHAR 1...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r59827338
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,504 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  private long maxLength;
    --- End diff --
    
    Min(0)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60335960
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    --- End diff --
    
    These are not long living files. They will be purged as soon as the data is flushed. Hence, you have a small number of small files. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60682134
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,688 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +import org.apache.hadoop.fs.Syncable;
    +import org.apache.hadoop.fs.local.LocalFs;
    +import org.apache.hadoop.fs.local.RawLocalFs;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * A WAL implementation that is file based.
    + * <p/>
    + * Note:<br/>
    + * The FileSystem Writer and Reader operations should not alternate because intermingling these operations will cause
    + * problems. Typically the  WAL Reader will only used in recovery.<br/>
    + *
    + * Also this implementation is thread unsafe- the filesystem wal writer and reader operations should be performed in
    + * operator's thread.
    + */
    +public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  private FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
    +
    +  //part => tmp file path;
    +  private final Map<Integer, String> tempPartFiles = new TreeMap<>();
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALWriter.open(fileContext);
    +      fileSystemWALReader.open(fileContext);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    --- End diff --
    
    @chandnisingh It may be useful to document above. It's not immediately obvious why we need committed() otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60343924
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    --- End diff --
    
    Yes, once the data is flushed, completed WAL segments need to be deleted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60319166
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALReader.open(fileContext);
    +      fileSystemWALWriter.open(fileContext);
    +
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    +  {
    +    try {
    +      fileSystemWALWriter.finalizeFiles(window);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during committed", e);
    +    }
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      fileSystemWALReader.close();
    +      fileSystemWALWriter.close();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during teardown", e);
    +    }
    +  }
    +
    +  protected long getLastCheckpointedWindow()
    +  {
    +    return lastCheckpointedWindow;
    +  }
    +
    +  protected String getPartFilePath(int partNumber)
    +  {
    +    return filePath + "_" + partNumber;
    +  }
    +
    +  @Override
    +  public FileSystemWALReader<T> getReader()
    +  {
    +    return fileSystemWALReader;
    +  }
    +
    +  /**
    +   * Sets the  File System WAL Reader. This can be used to override the default wal reader.
    +   *
    +   * @param fileSystemWALReader wal reader.
    +   */
    +  public void setFileSystemWALReader(@NotNull FileSystemWALReader<T> fileSystemWALReader)
    +  {
    +    this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
    +  }
    +
    +  @Override
    +  public FileSystemWALWriter<T> getWriter()
    +  {
    +    return fileSystemWALWriter;
    +  }
    +
    +  /**
    +   * Sets the File System WAL Writer. This can be used to override the default wal writer.
    +   * @param fileSystemWALWriter wal writer.
    +   */
    +  public void setFileSystemWALWriter(@NotNull FileSystemWALWriter<T> fileSystemWALWriter)
    +  {
    +    this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
    +  }
    +
    +  /**
    +   * @return WAL Entry serde
    +   */
    +  public Serde<T, byte[]> getSerde()
    +  {
    +    return serde;
    +  }
    +
    +  /**
    +   * Sets the serde which is used for wal entry serialization and de-serialization
    +   *
    +   * @param serde serializer/deserializer
    +   */
    +  public void setSerde(@NotNull Serde<T, byte[]> serde)
    +  {
    +    this.serde = Preconditions.checkNotNull(serde, "serde");
    +  }
    +
    +  /**
    +   * @return WAL file path
    +   */
    +  public String getFilePath()
    +  {
    +    return filePath;
    +  }
    +
    +  /**
    +   * Sets the WAL file path.
    +   *
    +   * @param filePath WAL file path
    +   */
    +  public void setFilePath(@NotNull String filePath)
    +  {
    +    this.filePath = Preconditions.checkNotNull(filePath, "filePath");
    +  }
    +
    +  /**
    +   * @return max length of a WAL part file.
    +   */
    +  public long getMaxLength()
    +  {
    +    return maxLength;
    +  }
    +
    +  /**
    +   * Sets the maximum length of a WAL part file.
    +   *
    +   * @param maxLength max length of the WAL part file
    +   */
    +  public void setMaxLength(long maxLength)
    +  {
    +    this.maxLength = maxLength;
    +  }
    +
    +  public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
    +  {
    +    private final int partNum;
    +    private long offset;
    +
    +    private FileSystemWALPointer()
    +    {
    +      //for kryo
    +      partNum = -1;
    +    }
    +
    +    public FileSystemWALPointer(long offset)
    +    {
    +      this(0, offset);
    +    }
    +
    +    public FileSystemWALPointer(int partNum, long offset)
    +    {
    +      this.partNum = partNum;
    +      this.offset = offset;
    +    }
    +
    +    @Override
    +    public int compareTo(@NotNull FileSystemWALPointer o)
    +    {
    +      if (this.partNum < o.partNum) {
    +        return -1;
    +      }
    +      if (this.partNum > o.partNum) {
    +        return 1;
    +      }
    +      if (this.offset < o.offset) {
    +        return -1;
    +      }
    +      if (this.offset > o.offset) {
    +        return 1;
    +      }
    +      return 0;
    +    }
    +
    +    public int getPartNum()
    +    {
    +      return partNum;
    +    }
    +
    +    public long getOffset()
    +    {
    +      return offset;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset + '}';
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem Wal Reader
    +   * @param <T> type of tuple.
    +   */
    +  public static class FileSystemWALReader<T> implements WAL.WALReader<T, FileSystemWALPointer>
    +  {
    +    private T entry;
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +
    +    private transient DataInputStream inputStream;
    +    private transient Path currentOpenPath;
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALReader()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALReader(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
    +      //initialize the input stream
    +      inputStream = getInputStream(currentPointer);
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +    }
    +
    +    @Override
    +    public void seek(FileSystemWALPointer pointer) throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +      }
    +      inputStream = getInputStream(pointer);
    +      Preconditions.checkNotNull(inputStream, "invalid pointer " + pointer);
    +      currentPointer = pointer;
    +    }
    +
    +    /**
    +     * Move to the next WAL segment.
    +     *
    +     * @return true if the next part file exists and is opened; false otherwise.
    +     * @throws IOException
    +     */
    +    private boolean nextSegment() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +
    +      currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
    +      inputStream = getInputStream(currentPointer);
    +
    +      return inputStream != null;
    +    }
    +
    +    private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException
    +    {
    +      Path walPartPath = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
    +      if (fileContext.util().exists(walPartPath)) {
    +        DataInputStream stream = fileContext.open(walPartPath);
    +        if (walPointer.offset > 0) {
    +          stream.skip(walPointer.offset);
    +        }
    +        currentOpenPath = walPartPath;
    +        return stream;
    +      }
    +      return null;
    +    }
    +
    +    @Override
    +    public boolean advance() throws IOException
    +    {
    +      do {
    +        if (inputStream == null) {
    +          inputStream = getInputStream(currentPointer);
    +        }
    +
    +        if (inputStream != null &&
    +            currentPointer.offset < fileContext.getFileStatus(currentOpenPath).getLen()) {
    +          int len = inputStream.readInt();
    +          Preconditions.checkState(len >= 0, "negative length");
    +
    +          byte[] data = new byte[len];
    +          inputStream.readFully(data);
    +
    +          entry = fileSystemWAL.serde.deserialize(data);
    +          currentPointer.offset += data.length + 4;
    +          return true;
    +        }
    +      } while (nextSegment());
    +
    +      entry = null;
    +      return false;
    +    }
    +
    +    @Override
    +    public T get()
    +    {
    +      return entry;
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem WAL Writer.
    +   * @param <T> type of tuple
    +   */
    +  public static class FileSystemWALWriter<T> implements WAL.WALWriter<T>
    +  {
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +    private transient DataOutputStream outputStream;
    +
    +    //windowId => Latest part which can be finalized.
    +    private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
    +
    +    //part => tmp file path;
    +    private final Map<Integer, String> tmpFiles = new TreeMap<>();
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALWriter()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALWriter(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "file context");
    +      recover();
    +      if (outputStream == null) {
    +        outputStream = getOutputStream(currentPointer);
    +      }
    +    }
    +
    +    private void recover() throws IOException
    +    {
    +      LOG.debug("current point", currentPointer);
    +      String tmpFilePath = tmpFiles.get(currentPointer.getPartNum());
    +      if (tmpFilePath != null) {
    +
    +        Path tmpPath = new Path(tmpFilePath);
    +        if (fileContext.util().exists(tmpPath)) {
    +          LOG.debug("tmp path exists {}", tmpPath);
    +
    +          outputStream = getOutputStream(currentPointer);
    +          DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
    +
    +          IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset, outputStream);
    +
    +          outputStream.flush();
    +          //remove old tmp
    +          inputStreamOldTmp.close();
    +          LOG.debug("delete tmp {}", tmpPath);
    +          fileContext.delete(tmpPath, true);
    +        }
    +      }
    +
    +      //find all valid path names
    +      Set<String> validPathNames = new HashSet<>();
    +      for (Map.Entry<Integer, String> entry : tmpFiles.entrySet()) {
    +        if (entry.getKey() <= currentPointer.partNum) {
    +          validPathNames.add(new Path(entry.getValue()).getName());
    +        }
    +      }
    +      LOG.debug("valid names {}", validPathNames);
    +
    +      //there can be a failure just between the flush and the actual checkpoint which can leave some stray tmp files
    +      //which aren't accounted by tmp files map
    +      Path walPath = new Path(fileSystemWAL.filePath);
    +      Path parentWAL = walPath.getParent();
    +      if (parentWAL != null && fileContext.util().exists(parentWAL)) {
    +        RemoteIterator<FileStatus> remoteIterator = fileContext.listStatus(parentWAL);
    +        while (remoteIterator.hasNext()) {
    +          FileStatus status = remoteIterator.next();
    +          String fileName = status.getPath().getName();
    +          if (fileName.startsWith(walPath.getName()) && fileName.endsWith(TMP_EXTENSION) &&
    +              !validPathNames.contains(fileName)) {
    +            LOG.debug("delete stray tmp {}", status.getPath());
    +            fileContext.delete(status.getPath(), true);
    +          }
    +
    +        }
    +      }
    +
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (outputStream != null) {
    +        flush();
    +        outputStream.close();
    +        outputStream = null;
    +        LOG.debug("closed {}", currentPointer.partNum);
    +      }
    +    }
    +
    +    @Override
    +    public int append(T entry) throws IOException
    +    {
    +      byte[] slice = fileSystemWAL.serde.serialize(entry);
    +      int entryLength = slice.length + 4;
    +
    +      // rotate if needed
    +      if (shouldRotate(entryLength)) {
    +        rotate(true);
    +      }
    +
    +      outputStream.writeInt(slice.length);
    +      outputStream.write(slice);
    +      currentPointer.offset += entryLength;
    +
    +      if (currentPointer.offset == fileSystemWAL.maxLength) {
    +        //if the file is completed then we can rotate it. do not have to wait for next entry
    +        rotate(false);
    +      }
    +
    +      return entryLength;
    +    }
    +
    +    protected void flush() throws IOException
    +    {
    +      if (outputStream != null) {
    +        outputStream.flush();
    +        if (outputStream instanceof FSDataOutputStream) {
    +          ((FSDataOutputStream)outputStream).hflush();
    --- End diff --
    
    ```
          if (outputStream instanceof Syncable) {
              Syncable syncableOutputStream = (Syncable)outputStream;
              syncableOutputStream.hflush();
              syncableOutputStream.hsync();
            } else {
              //On local file system the write stream needs to be closed for the reader to see any data
              outputStream.close();
            }
    ```
    Does the above look good. On local file system, the reader wasn't seeing data unless the output stream gets closed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60335014
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALReader.open(fileContext);
    +      fileSystemWALWriter.open(fileContext);
    +
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    +  {
    +    try {
    +      fileSystemWALWriter.finalizeFiles(window);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during committed", e);
    +    }
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      fileSystemWALReader.close();
    +      fileSystemWALWriter.close();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during teardown", e);
    +    }
    +  }
    +
    +  protected long getLastCheckpointedWindow()
    +  {
    +    return lastCheckpointedWindow;
    +  }
    +
    +  protected String getPartFilePath(int partNumber)
    +  {
    +    return filePath + "_" + partNumber;
    +  }
    +
    +  @Override
    +  public FileSystemWALReader<T> getReader()
    +  {
    +    return fileSystemWALReader;
    +  }
    +
    +  /**
    +   * Sets the  File System WAL Reader. This can be used to override the default wal reader.
    +   *
    +   * @param fileSystemWALReader wal reader.
    +   */
    +  public void setFileSystemWALReader(@NotNull FileSystemWALReader<T> fileSystemWALReader)
    +  {
    +    this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
    +  }
    +
    +  @Override
    +  public FileSystemWALWriter<T> getWriter()
    +  {
    +    return fileSystemWALWriter;
    +  }
    +
    +  /**
    +   * Sets the File System WAL Writer. This can be used to override the default wal writer.
    +   * @param fileSystemWALWriter wal writer.
    +   */
    +  public void setFileSystemWALWriter(@NotNull FileSystemWALWriter<T> fileSystemWALWriter)
    +  {
    +    this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
    +  }
    +
    +  /**
    +   * @return WAL Entry serde
    +   */
    +  public Serde<T, byte[]> getSerde()
    +  {
    +    return serde;
    +  }
    +
    +  /**
    +   * Sets the serde which is used for wal entry serialization and de-serialization
    +   *
    +   * @param serde serializer/deserializer
    +   */
    +  public void setSerde(@NotNull Serde<T, byte[]> serde)
    +  {
    +    this.serde = Preconditions.checkNotNull(serde, "serde");
    +  }
    +
    +  /**
    +   * @return WAL file path
    +   */
    +  public String getFilePath()
    +  {
    +    return filePath;
    +  }
    +
    +  /**
    +   * Sets the WAL file path.
    +   *
    +   * @param filePath WAL file path
    +   */
    +  public void setFilePath(@NotNull String filePath)
    +  {
    +    this.filePath = Preconditions.checkNotNull(filePath, "filePath");
    +  }
    +
    +  /**
    +   * @return max length of a WAL part file.
    +   */
    +  public long getMaxLength()
    +  {
    +    return maxLength;
    +  }
    +
    +  /**
    +   * Sets the maximum length of a WAL part file.
    +   *
    +   * @param maxLength max length of the WAL part file
    +   */
    +  public void setMaxLength(long maxLength)
    +  {
    +    this.maxLength = maxLength;
    +  }
    +
    +  public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
    +  {
    +    private final int partNum;
    +    private long offset;
    +
    +    private FileSystemWALPointer()
    +    {
    +      //for kryo
    +      partNum = -1;
    +    }
    +
    +    public FileSystemWALPointer(long offset)
    +    {
    +      this(0, offset);
    +    }
    +
    +    public FileSystemWALPointer(int partNum, long offset)
    +    {
    +      this.partNum = partNum;
    +      this.offset = offset;
    +    }
    +
    +    @Override
    +    public int compareTo(@NotNull FileSystemWALPointer o)
    +    {
    +      if (this.partNum < o.partNum) {
    +        return -1;
    +      }
    +      if (this.partNum > o.partNum) {
    +        return 1;
    +      }
    +      if (this.offset < o.offset) {
    +        return -1;
    +      }
    +      if (this.offset > o.offset) {
    +        return 1;
    +      }
    +      return 0;
    +    }
    +
    +    public int getPartNum()
    +    {
    +      return partNum;
    +    }
    +
    +    public long getOffset()
    +    {
    +      return offset;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset + '}';
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem Wal Reader
    +   * @param <T> type of tuple.
    +   */
    +  public static class FileSystemWALReader<T> implements WAL.WALReader<T, FileSystemWALPointer>
    +  {
    +    private T entry;
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +
    +    private transient DataInputStream inputStream;
    +    private transient Path currentOpenPath;
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALReader()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALReader(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
    +      //initialize the input stream
    +      inputStream = getInputStream(currentPointer);
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +    }
    +
    +    @Override
    +    public void seek(FileSystemWALPointer pointer) throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +      }
    +      inputStream = getInputStream(pointer);
    +      Preconditions.checkNotNull(inputStream, "invalid pointer " + pointer);
    +      currentPointer = pointer;
    +    }
    +
    +    /**
    +     * Move to the next WAL segment.
    +     *
    +     * @return true if the next part file exists and is opened; false otherwise.
    +     * @throws IOException
    +     */
    +    private boolean nextSegment() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +
    +      currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
    +      inputStream = getInputStream(currentPointer);
    +
    +      return inputStream != null;
    +    }
    +
    +    private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException
    +    {
    +      Path walPartPath = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
    +      if (fileContext.util().exists(walPartPath)) {
    +        DataInputStream stream = fileContext.open(walPartPath);
    +        if (walPointer.offset > 0) {
    +          stream.skip(walPointer.offset);
    +        }
    +        currentOpenPath = walPartPath;
    +        return stream;
    +      }
    +      return null;
    +    }
    +
    +    @Override
    +    public boolean advance() throws IOException
    +    {
    +      do {
    +        if (inputStream == null) {
    +          inputStream = getInputStream(currentPointer);
    +        }
    +
    +        if (inputStream != null &&
    +            currentPointer.offset < fileContext.getFileStatus(currentOpenPath).getLen()) {
    +          int len = inputStream.readInt();
    +          Preconditions.checkState(len >= 0, "negative length");
    +
    +          byte[] data = new byte[len];
    +          inputStream.readFully(data);
    +
    +          entry = fileSystemWAL.serde.deserialize(data);
    +          currentPointer.offset += data.length + 4;
    +          return true;
    +        }
    +      } while (nextSegment());
    +
    +      entry = null;
    +      return false;
    +    }
    +
    +    @Override
    +    public T get()
    +    {
    +      return entry;
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem WAL Writer.
    +   * @param <T> type of tuple
    +   */
    +  public static class FileSystemWALWriter<T> implements WAL.WALWriter<T>
    +  {
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +    private transient DataOutputStream outputStream;
    +
    +    //windowId => Latest part which can be finalized.
    +    private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
    +
    +    //part => tmp file path;
    +    private final Map<Integer, String> tmpFiles = new TreeMap<>();
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALWriter()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALWriter(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "file context");
    +      recover();
    +      if (outputStream == null) {
    +        outputStream = getOutputStream(currentPointer);
    +      }
    +    }
    +
    +    private void recover() throws IOException
    +    {
    +      LOG.debug("current point", currentPointer);
    +      String tmpFilePath = tmpFiles.get(currentPointer.getPartNum());
    +      if (tmpFilePath != null) {
    +
    +        Path tmpPath = new Path(tmpFilePath);
    +        if (fileContext.util().exists(tmpPath)) {
    +          LOG.debug("tmp path exists {}", tmpPath);
    +
    +          outputStream = getOutputStream(currentPointer);
    +          DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
    +
    +          IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset, outputStream);
    +
    +          outputStream.flush();
    +          //remove old tmp
    +          inputStreamOldTmp.close();
    +          LOG.debug("delete tmp {}", tmpPath);
    +          fileContext.delete(tmpPath, true);
    +        }
    +      }
    +
    +      //find all valid path names
    +      Set<String> validPathNames = new HashSet<>();
    +      for (Map.Entry<Integer, String> entry : tmpFiles.entrySet()) {
    +        if (entry.getKey() <= currentPointer.partNum) {
    +          validPathNames.add(new Path(entry.getValue()).getName());
    +        }
    +      }
    +      LOG.debug("valid names {}", validPathNames);
    +
    +      //there can be a failure just between the flush and the actual checkpoint which can leave some stray tmp files
    +      //which aren't accounted by tmp files map
    +      Path walPath = new Path(fileSystemWAL.filePath);
    +      Path parentWAL = walPath.getParent();
    +      if (parentWAL != null && fileContext.util().exists(parentWAL)) {
    +        RemoteIterator<FileStatus> remoteIterator = fileContext.listStatus(parentWAL);
    +        while (remoteIterator.hasNext()) {
    +          FileStatus status = remoteIterator.next();
    +          String fileName = status.getPath().getName();
    +          if (fileName.startsWith(walPath.getName()) && fileName.endsWith(TMP_EXTENSION) &&
    +              !validPathNames.contains(fileName)) {
    +            LOG.debug("delete stray tmp {}", status.getPath());
    +            fileContext.delete(status.getPath(), true);
    +          }
    +
    +        }
    +      }
    +
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (outputStream != null) {
    +        flush();
    +        outputStream.close();
    +        outputStream = null;
    +        LOG.debug("closed {}", currentPointer.partNum);
    +      }
    +    }
    +
    +    @Override
    +    public int append(T entry) throws IOException
    +    {
    +      byte[] slice = fileSystemWAL.serde.serialize(entry);
    +      int entryLength = slice.length + 4;
    +
    +      // rotate if needed
    +      if (shouldRotate(entryLength)) {
    +        rotate(true);
    +      }
    +
    +      outputStream.writeInt(slice.length);
    +      outputStream.write(slice);
    +      currentPointer.offset += entryLength;
    +
    +      if (currentPointer.offset == fileSystemWAL.maxLength) {
    +        //if the file is completed then we can rotate it. do not have to wait for next entry
    +        rotate(false);
    +      }
    +
    +      return entryLength;
    +    }
    +
    +    protected void flush() throws IOException
    +    {
    +      if (outputStream != null) {
    +        outputStream.flush();
    +        if (outputStream instanceof FSDataOutputStream) {
    +          ((FSDataOutputStream)outputStream).hflush();
    --- End diff --
    
    Please take a look at the new changes. I had to make some changes in order to read from temporary files. On local file system until the stream is closed, the data to readers isn't available.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60677017
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,688 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +import org.apache.hadoop.fs.Syncable;
    +import org.apache.hadoop.fs.local.LocalFs;
    +import org.apache.hadoop.fs.local.RawLocalFs;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * A WAL implementation that is file based.
    + * <p/>
    + * Note:<br/>
    + * The FileSystem Writer and Reader operations should not alternate because intermingling these operations will cause
    + * problems. Typically the  WAL Reader will only used in recovery.<br/>
    + *
    + * Also this implementation is thread unsafe- the filesystem wal writer and reader operations should be performed in
    + * operator's thread.
    + */
    +public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  private FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
    +
    +  //part => tmp file path;
    +  private final Map<Integer, String> tempPartFiles = new TreeMap<>();
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALWriter.open(fileContext);
    +      fileSystemWALReader.open(fileContext);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    +  {
    +    try {
    +      fileSystemWALWriter.finalizeFiles(window);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during committed", e);
    +    }
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      fileSystemWALReader.close();
    +      fileSystemWALWriter.close();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during teardown", e);
    +    }
    +  }
    +
    +  protected long getLastCheckpointedWindow()
    +  {
    +    return lastCheckpointedWindow;
    +  }
    +
    +  protected String getPartFilePath(int partNumber)
    +  {
    +    return filePath + "_" + partNumber;
    +  }
    +
    +  @Override
    +  public FileSystemWALReader getReader()
    +  {
    +    return fileSystemWALReader;
    +  }
    +
    +  /**
    +   * Sets the  File System WAL Reader. This can be used to override the default wal reader.
    +   *
    +   * @param fileSystemWALReader wal reader.
    +   */
    +  public void setFileSystemWALReader(@NotNull FileSystemWALReader fileSystemWALReader)
    +  {
    +    this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
    +  }
    +
    +  @Override
    +  public FileSystemWALWriter getWriter()
    +  {
    +    return fileSystemWALWriter;
    +  }
    +
    +  /**
    +   * Sets the File System WAL Writer. This can be used to override the default wal writer.
    +   *
    +   * @param fileSystemWALWriter wal writer.
    +   */
    +  public void setFileSystemWALWriter(@NotNull FileSystemWALWriter fileSystemWALWriter)
    +  {
    +    this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
    +  }
    +
    +  /**
    +   * @return WAL file path
    +   */
    +  public String getFilePath()
    +  {
    +    return filePath;
    +  }
    +
    +  /**
    +   * Sets the WAL file path.
    +   *
    +   * @param filePath WAL file path
    +   */
    +  public void setFilePath(@NotNull String filePath)
    +  {
    +    this.filePath = Preconditions.checkNotNull(filePath, "filePath");
    +  }
    +
    +  /**
    +   * @return max length of a WAL part file.
    +   */
    +  public long getMaxLength()
    +  {
    +    return maxLength;
    +  }
    +
    +  /**
    +   * Sets the maximum length of a WAL part file.
    +   *
    +   * @param maxLength max length of the WAL part file
    +   */
    +  public void setMaxLength(long maxLength)
    +  {
    +    this.maxLength = maxLength;
    +  }
    +
    +  public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
    +  {
    +    private final int partNum;
    +    private long offset;
    +
    +    private FileSystemWALPointer()
    +    {
    +      //for kryo
    +      partNum = -1;
    +    }
    +
    +    public FileSystemWALPointer(int partNum, long offset)
    +    {
    +      this.partNum = partNum;
    +      this.offset = offset;
    +    }
    +
    +    @Override
    +    public int compareTo(@NotNull FileSystemWALPointer o)
    +    {
    +      if (this.partNum < o.partNum) {
    --- End diff --
    
    Integer.compare would keep this short :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60631541
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    --- End diff --
    
    Yes, because we don't make assumptions about the FS implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60353719
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,594 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.ConcurrentSkipListMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +import org.apache.hadoop.fs.Syncable;
    +import org.apache.hadoop.fs.local.LocalFs;
    +import org.apache.hadoop.fs.local.RawLocalFs;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +import com.datatorrent.netlet.util.Slice;
    +
    +public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
    +
    +  //part => tmp file path;
    +  private final ConcurrentSkipListMap<Integer, String> tempPartFiles = new ConcurrentSkipListMap<>();
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALWriter.open(fileContext);
    +      fileSystemWALReader.open(fileContext);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    +  {
    +    try {
    +      fileSystemWALWriter.finalizeFiles(window);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during committed", e);
    +    }
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      fileSystemWALReader.close();
    +      fileSystemWALWriter.close();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during teardown", e);
    +    }
    +  }
    +
    +  protected long getLastCheckpointedWindow()
    +  {
    +    return lastCheckpointedWindow;
    +  }
    +
    +  protected String getPartFilePath(int partNumber)
    +  {
    +    return filePath + "_" + partNumber;
    +  }
    +
    +  @Override
    +  public FileSystemWALReader getReader()
    +  {
    +    return fileSystemWALReader;
    +  }
    +
    +  /**
    +   * Sets the  File System WAL Reader. This can be used to override the default wal reader.
    +   *
    +   * @param fileSystemWALReader wal reader.
    +   */
    +  public void setFileSystemWALReader(@NotNull FileSystemWALReader fileSystemWALReader)
    +  {
    +    this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
    +  }
    +
    +  @Override
    +  public FileSystemWALWriter getWriter()
    +  {
    +    return fileSystemWALWriter;
    +  }
    +
    +  /**
    +   * Sets the File System WAL Writer. This can be used to override the default wal writer.
    +   *
    +   * @param fileSystemWALWriter wal writer.
    +   */
    +  public void setFileSystemWALWriter(@NotNull FileSystemWALWriter fileSystemWALWriter)
    +  {
    +    this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
    +  }
    +
    +  /**
    +   * @return WAL file path
    +   */
    +  public String getFilePath()
    +  {
    +    return filePath;
    +  }
    +
    +  /**
    +   * Sets the WAL file path.
    +   *
    +   * @param filePath WAL file path
    +   */
    +  public void setFilePath(@NotNull String filePath)
    +  {
    +    this.filePath = Preconditions.checkNotNull(filePath, "filePath");
    +  }
    +
    +  /**
    +   * @return max length of a WAL part file.
    +   */
    +  public long getMaxLength()
    +  {
    +    return maxLength;
    +  }
    +
    +  /**
    +   * Sets the maximum length of a WAL part file.
    +   *
    +   * @param maxLength max length of the WAL part file
    +   */
    +  public void setMaxLength(long maxLength)
    +  {
    +    this.maxLength = maxLength;
    +  }
    +
    +  public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
    +  {
    +    private final int partNum;
    +    private long offset;
    +
    +    private FileSystemWALPointer()
    +    {
    +      //for kryo
    +      partNum = -1;
    +    }
    +
    +    public FileSystemWALPointer(long offset)
    +    {
    +      this(0, offset);
    +    }
    +
    +    public FileSystemWALPointer(int partNum, long offset)
    +    {
    +      this.partNum = partNum;
    +      this.offset = offset;
    +    }
    +
    +    @Override
    +    public int compareTo(@NotNull FileSystemWALPointer o)
    +    {
    +      if (this.partNum < o.partNum) {
    +        return -1;
    +      }
    +      if (this.partNum > o.partNum) {
    +        return 1;
    +      }
    +      if (this.offset < o.offset) {
    +        return -1;
    +      }
    +      if (this.offset > o.offset) {
    +        return 1;
    +      }
    +      return 0;
    +    }
    +
    +    public int getPartNum()
    +    {
    +      return partNum;
    +    }
    +
    +    public long getOffset()
    +    {
    +      return offset;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset + '}';
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem Wal Reader
    +   */
    +  public static class FileSystemWALReader implements WAL.WALReader<FileSystemWALPointer>
    +  {
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +
    +    private transient DataInputStream inputStream;
    +    private transient Path currentOpenPath;
    +
    +    private final FileSystemWAL fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALReader()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALReader(@NotNull FileSystemWAL fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +    }
    +
    +    @Override
    +    public void seek(FileSystemWALPointer pointer) throws IOException
    +    {
    +      if (inputStream != null) {
    +        close();
    +      }
    +      inputStream = getInputStream(pointer);
    +      Preconditions.checkNotNull(inputStream, "invalid pointer " + pointer);
    +      currentPointer = pointer;
    +    }
    +
    +    /**
    +     * Move to the next WAL segment.
    +     *
    +     * @return true if the next part file exists and is opened; false otherwise.
    +     * @throws IOException
    +     */
    +    private boolean nextSegment() throws IOException
    +    {
    +      if (inputStream != null) {
    +        close();
    +      }
    +
    +      currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
    +      inputStream = getInputStream(currentPointer);
    +
    +      return inputStream != null;
    +    }
    +
    +    private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException
    +    {
    +      Preconditions.checkArgument(inputStream == null, "input stream not null");
    +      Path pathToReadFrom;
    +      String tmpPath = fileSystemWAL.tempPartFiles.get(walPointer.getPartNum());
    +      if (tmpPath != null) {
    +        pathToReadFrom = new Path(tmpPath);
    +      } else {
    +        pathToReadFrom = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
    +      }
    +
    +      LOG.debug("path to read {} and pointer {}", pathToReadFrom, walPointer);
    +      if (fileContext.util().exists(pathToReadFrom)) {
    +        DataInputStream stream = fileContext.open(pathToReadFrom);
    +        if (walPointer.offset > 0) {
    +          stream.skip(walPointer.offset);
    +        }
    +        currentOpenPath = pathToReadFrom;
    +        return stream;
    +      }
    +      return null;
    +    }
    +
    +    @Override
    +    public Slice next() throws IOException
    +    {
    +      do {
    +        if (inputStream == null) {
    +          inputStream = getInputStream(currentPointer);
    +        }
    +
    +        if (inputStream != null && !fileContext.util().exists(currentOpenPath)) {
    --- End diff --
    
    Ok will do that. Was trying to minimize the overlap between reader and writer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-1965 WAL Utility [F...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60258305
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
    @@ -0,0 +1,598 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.wal;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.apex.malhar.lib.utils.FileContextUtils;
    +import org.apache.apex.malhar.lib.utils.IOUtils;
    +import org.apache.apex.malhar.lib.utils.Serde;
    +import org.apache.hadoop.fs.CreateFlag;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileContext;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.Options;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.annotation.Stateless;
    +
    +public class FileSystemWAL<T> implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
    +{
    +  @NotNull
    +  private Serde<T, byte[]> serde;
    +
    +  @NotNull
    +  private String filePath;
    +
    +  //max length of the file
    +  @Min(0)
    +  private long maxLength;
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new FileSystemWALReader<>(this);
    +
    +  @NotNull
    +  private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new FileSystemWALWriter<>(this);
    +
    +  private long lastCheckpointedWindow = Stateless.WINDOW_ID;
    +
    +  @Override
    +  public void setup()
    +  {
    +    try {
    +      FileContext fileContext = FileContextUtils.getFileContext(filePath);
    +      if (maxLength == 0) {
    +        maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
    +      }
    +      fileSystemWALReader.open(fileContext);
    +      fileSystemWALWriter.open(fileContext);
    +
    +    } catch (IOException e) {
    +      throw new RuntimeException("during setup", e);
    +    }
    +  }
    +
    +  @Override
    +  public void beforeCheckpoint(long window)
    +  {
    +    try {
    +      lastCheckpointedWindow = window;
    +      fileSystemWALWriter.flush();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during before cp", e);
    +    }
    +  }
    +
    +  @Override
    +  public void committed(long window)
    +  {
    +    try {
    +      fileSystemWALWriter.finalizeFiles(window);
    +    } catch (IOException e) {
    +      throw new RuntimeException("during committed", e);
    +    }
    +  }
    +
    +  @Override
    +  public void teardown()
    +  {
    +    try {
    +      fileSystemWALReader.close();
    +      fileSystemWALWriter.close();
    +    } catch (IOException e) {
    +      throw new RuntimeException("during teardown", e);
    +    }
    +  }
    +
    +  protected long getLastCheckpointedWindow()
    +  {
    +    return lastCheckpointedWindow;
    +  }
    +
    +  protected String getPartFilePath(int partNumber)
    +  {
    +    return filePath + "_" + partNumber;
    +  }
    +
    +  @Override
    +  public FileSystemWALReader<T> getReader()
    +  {
    +    return fileSystemWALReader;
    +  }
    +
    +  /**
    +   * Sets the  File System WAL Reader. This can be used to override the default wal reader.
    +   *
    +   * @param fileSystemWALReader wal reader.
    +   */
    +  public void setFileSystemWALReader(@NotNull FileSystemWALReader<T> fileSystemWALReader)
    +  {
    +    this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
    +  }
    +
    +  @Override
    +  public FileSystemWALWriter<T> getWriter()
    +  {
    +    return fileSystemWALWriter;
    +  }
    +
    +  /**
    +   * Sets the File System WAL Writer. This can be used to override the default wal writer.
    +   * @param fileSystemWALWriter wal writer.
    +   */
    +  public void setFileSystemWALWriter(@NotNull FileSystemWALWriter<T> fileSystemWALWriter)
    +  {
    +    this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
    +  }
    +
    +  /**
    +   * @return WAL Entry serde
    +   */
    +  public Serde<T, byte[]> getSerde()
    +  {
    +    return serde;
    +  }
    +
    +  /**
    +   * Sets the serde which is used for wal entry serialization and de-serialization
    +   *
    +   * @param serde serializer/deserializer
    +   */
    +  public void setSerde(@NotNull Serde<T, byte[]> serde)
    +  {
    +    this.serde = Preconditions.checkNotNull(serde, "serde");
    +  }
    +
    +  /**
    +   * @return WAL file path
    +   */
    +  public String getFilePath()
    +  {
    +    return filePath;
    +  }
    +
    +  /**
    +   * Sets the WAL file path.
    +   *
    +   * @param filePath WAL file path
    +   */
    +  public void setFilePath(@NotNull String filePath)
    +  {
    +    this.filePath = Preconditions.checkNotNull(filePath, "filePath");
    +  }
    +
    +  /**
    +   * @return max length of a WAL part file.
    +   */
    +  public long getMaxLength()
    +  {
    +    return maxLength;
    +  }
    +
    +  /**
    +   * Sets the maximum length of a WAL part file.
    +   *
    +   * @param maxLength max length of the WAL part file
    +   */
    +  public void setMaxLength(long maxLength)
    +  {
    +    this.maxLength = maxLength;
    +  }
    +
    +  public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
    +  {
    +    private final int partNum;
    +    private long offset;
    +
    +    private FileSystemWALPointer()
    +    {
    +      //for kryo
    +      partNum = -1;
    +    }
    +
    +    public FileSystemWALPointer(long offset)
    +    {
    +      this(0, offset);
    +    }
    +
    +    public FileSystemWALPointer(int partNum, long offset)
    +    {
    +      this.partNum = partNum;
    +      this.offset = offset;
    +    }
    +
    +    @Override
    +    public int compareTo(@NotNull FileSystemWALPointer o)
    +    {
    +      if (this.partNum < o.partNum) {
    +        return -1;
    +      }
    +      if (this.partNum > o.partNum) {
    +        return 1;
    +      }
    +      if (this.offset < o.offset) {
    +        return -1;
    +      }
    +      if (this.offset > o.offset) {
    +        return 1;
    +      }
    +      return 0;
    +    }
    +
    +    public int getPartNum()
    +    {
    +      return partNum;
    +    }
    +
    +    public long getOffset()
    +    {
    +      return offset;
    +    }
    +
    +    @Override
    +    public String toString()
    +    {
    +      return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset + '}';
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem Wal Reader
    +   * @param <T> type of tuple.
    +   */
    +  public static class FileSystemWALReader<T> implements WAL.WALReader<T, FileSystemWALPointer>
    +  {
    +    private T entry;
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +
    +    private transient DataInputStream inputStream;
    +    private transient Path currentOpenPath;
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALReader()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALReader(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
    +      //initialize the input stream
    +      inputStream = getInputStream(currentPointer);
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +    }
    +
    +    @Override
    +    public void seek(FileSystemWALPointer pointer) throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +      }
    +      inputStream = getInputStream(pointer);
    +      Preconditions.checkNotNull(inputStream, "invalid pointer " + pointer);
    +      currentPointer = pointer;
    +    }
    +
    +    /**
    +     * Move to the next WAL segment.
    +     *
    +     * @return true if the next part file exists and is opened; false otherwise.
    +     * @throws IOException
    +     */
    +    private boolean nextSegment() throws IOException
    +    {
    +      if (inputStream != null) {
    +        inputStream.close();
    +        inputStream = null;
    +      }
    +
    +      currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
    +      inputStream = getInputStream(currentPointer);
    +
    +      return inputStream != null;
    +    }
    +
    +    private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException
    +    {
    +      Path walPartPath = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
    +      if (fileContext.util().exists(walPartPath)) {
    +        DataInputStream stream = fileContext.open(walPartPath);
    +        if (walPointer.offset > 0) {
    +          stream.skip(walPointer.offset);
    +        }
    +        currentOpenPath = walPartPath;
    +        return stream;
    +      }
    +      return null;
    +    }
    +
    +    @Override
    +    public boolean advance() throws IOException
    +    {
    +      do {
    +        if (inputStream == null) {
    +          inputStream = getInputStream(currentPointer);
    +        }
    +
    +        if (inputStream != null &&
    +            currentPointer.offset < fileContext.getFileStatus(currentOpenPath).getLen()) {
    +          int len = inputStream.readInt();
    +          Preconditions.checkState(len >= 0, "negative length");
    +
    +          byte[] data = new byte[len];
    +          inputStream.readFully(data);
    +
    +          entry = fileSystemWAL.serde.deserialize(data);
    +          currentPointer.offset += data.length + 4;
    +          return true;
    +        }
    +      } while (nextSegment());
    +
    +      entry = null;
    +      return false;
    +    }
    +
    +    @Override
    +    public T get()
    +    {
    +      return entry;
    +    }
    +  }
    +
    +  /**
    +   * A FileSystem WAL Writer.
    +   * @param <T> type of tuple
    +   */
    +  public static class FileSystemWALWriter<T> implements WAL.WALWriter<T>
    +  {
    +    private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
    +    private transient DataOutputStream outputStream;
    +
    +    //windowId => Latest part which can be finalized.
    +    private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
    +
    +    //part => tmp file path;
    +    private final Map<Integer, String> tmpFiles = new TreeMap<>();
    +
    +    private final FileSystemWAL<T> fileSystemWAL;
    +    private transient FileContext fileContext;
    +
    +    private FileSystemWALWriter()
    +    {
    +      //for kryo
    +      fileSystemWAL = null;
    +    }
    +
    +    public FileSystemWALWriter(@NotNull FileSystemWAL<T> fileSystemWal)
    +    {
    +      this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
    +    }
    +
    +    protected void open(@NotNull FileContext fileContext) throws IOException
    +    {
    +      this.fileContext = Preconditions.checkNotNull(fileContext, "file context");
    +      recover();
    +      if (outputStream == null) {
    +        outputStream = getOutputStream(currentPointer);
    +      }
    +    }
    +
    +    private void recover() throws IOException
    +    {
    +      LOG.debug("current point", currentPointer);
    +      String tmpFilePath = tmpFiles.get(currentPointer.getPartNum());
    +      if (tmpFilePath != null) {
    +
    +        Path tmpPath = new Path(tmpFilePath);
    +        if (fileContext.util().exists(tmpPath)) {
    +          LOG.debug("tmp path exists {}", tmpPath);
    +
    +          outputStream = getOutputStream(currentPointer);
    +          DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
    +
    +          IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset, outputStream);
    +
    +          outputStream.flush();
    +          //remove old tmp
    +          inputStreamOldTmp.close();
    +          LOG.debug("delete tmp {}", tmpPath);
    +          fileContext.delete(tmpPath, true);
    +        }
    +      }
    +
    +      //find all valid path names
    +      Set<String> validPathNames = new HashSet<>();
    +      for (Map.Entry<Integer, String> entry : tmpFiles.entrySet()) {
    +        if (entry.getKey() <= currentPointer.partNum) {
    +          validPathNames.add(new Path(entry.getValue()).getName());
    +        }
    +      }
    +      LOG.debug("valid names {}", validPathNames);
    +
    +      //there can be a failure just between the flush and the actual checkpoint which can leave some stray tmp files
    +      //which aren't accounted by tmp files map
    +      Path walPath = new Path(fileSystemWAL.filePath);
    +      Path parentWAL = walPath.getParent();
    +      if (parentWAL != null && fileContext.util().exists(parentWAL)) {
    +        RemoteIterator<FileStatus> remoteIterator = fileContext.listStatus(parentWAL);
    +        while (remoteIterator.hasNext()) {
    +          FileStatus status = remoteIterator.next();
    +          String fileName = status.getPath().getName();
    +          if (fileName.startsWith(walPath.getName()) && fileName.endsWith(TMP_EXTENSION) &&
    +              !validPathNames.contains(fileName)) {
    +            LOG.debug("delete stray tmp {}", status.getPath());
    +            fileContext.delete(status.getPath(), true);
    +          }
    +
    +        }
    +      }
    +
    +    }
    +
    +    protected void close() throws IOException
    +    {
    +      if (outputStream != null) {
    +        flush();
    +        outputStream.close();
    +        outputStream = null;
    +        LOG.debug("closed {}", currentPointer.partNum);
    +      }
    +    }
    +
    +    @Override
    +    public int append(T entry) throws IOException
    +    {
    +      byte[] slice = fileSystemWAL.serde.serialize(entry);
    +      int entryLength = slice.length + 4;
    +
    +      // rotate if needed
    +      if (shouldRotate(entryLength)) {
    +        rotate(true);
    +      }
    +
    +      outputStream.writeInt(slice.length);
    +      outputStream.write(slice);
    +      currentPointer.offset += entryLength;
    +
    +      if (currentPointer.offset == fileSystemWAL.maxLength) {
    +        //if the file is completed then we can rotate it. do not have to wait for next entry
    +        rotate(false);
    +      }
    +
    +      return entryLength;
    +    }
    +
    +    protected void flush() throws IOException
    +    {
    +      if (outputStream != null) {
    +        outputStream.flush();
    +        if (outputStream instanceof FSDataOutputStream) {
    +          ((FSDataOutputStream)outputStream).hflush();
    --- End diff --
    
    Reminder on why we need both calls?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---