You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hadoop.apache.org by Dmitry Goldenberg <dg...@gmail.com> on 2016/05/26 03:11:45 UTC

What is the recommended way to append to files on hdfs?

I'm having trouble figuring out a safe way to append to files in HDFS.

I'm using a small, 3-node Hadoop cluster (CDH v.5.3.9 to be specific). Our
process is a data pipeliner which is multi-threaded (8 threads) and it has
a stage which appends lines of delimited text to files in a dedicated
directory on HDFS. I'm using locks to synchronize access of the threads to
the buffered writers which append the data.

My first issue is deciding on the approach generally.

Approach A is to open the file, append to it, then close it for every line
appended. This seems slow and would seem to create too many small blocks,
or at least I see some such sentiment in various posts.

Approach B is to cache the writers but periodically refresh them to make
sure the list of writers doesn't grow unbounded (currently, it's one writer
per each input file processed by the pipeliner). This seems like a more
efficient approach but I imagine having open streams over a period of time
however controlled may be an issue, especially for output file readers (?)

Beyond this, my real issues are two. I am using the FileSystem Java Hadoop
API to do the appending and am intermittently getting these 2 types of
exceptions:

org.apache.hadoop.ipc.RemoteException: failed to create file
/output/acme_20160524_1.txt for DFSClient_NONMAPREDUCE_271210261_1 for
client XXX.XX.XXX.XX because current leaseholder is trying to recreate file.

org.apache.hadoop.ipc.RemoteException:
BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540 does not
exist or is not under Constructionblk_1073760252_545
40{blockUCState=UNDER_RECOVERY, primaryNodeIndex=1,
replicas=[ReplicaUnderConstruction[[DISK]DS-ccdf4e55-234b-4e17-955f-daaed1afdd92:NORMAL|RBW],
ReplicaUnderConst
ruction[[DISK]DS-1f66db61-759f-4c5d-bb3b-f78c260e338f:NORMAL|RBW]]}

Anyone have any ideas on either of those?

For the first problem, I've tried instrumenting logic discussed in this post
<http://stackoverflow.com/questions/23833318/crashed-hdfs-client-how-to-close-remaining-open-files>
but
didn't seem to help.

I'm also interested in the role of the dfs.support.append property, if at
all applicable.

The code is more or less as follows, for getting the output stream

userGroupInfo = UserGroupInformation.*createRemoteUser*("hdfs");

Configuration conf = new Configuration();

conf.set(key1, val1);

....

conf.set(keyN, valN);

fileSystem = userGroupInfo.doAs(*new* PrivilegedExceptionAction<FileSystem>()
{

        *public* FileSystem run() *throws* Exception {

            *return* FileSystem.*get*(conf);

        }

      });

*org.apache.hadoop.fs.path.Path file = ...*

*public* OutputStream getOutputStream(*boolean* append) *throws* IOException
{

    OutputStream os = *null*;

    *synchronized* (file) {

      // If the file exists

      *if* (isFile()) {

        // See if we're to append or to overwrite

        os = (append)

          ? fs.append(file) : fs.create(file, *true*);

      }

      // Appending to a non-existent file

      *else* *if* (append) {

        // Create the file first

        // otherwise, "failed to append to non-existent file" exception

        FSDataOutputStream dos = fs.create(file);

        dos.close();

        // Open it for appending

        os = fs.append(file);

      }

      // Creating a new file

      *else* {

        os = fs.create(file);

      }

    }

    *return* os;

  }

Re: What is the recommended way to append to files on hdfs?

Posted by Dmitry Goldenberg <dg...@gmail.com>.
Thanks, John.

Perhaps I didn't write up that part clearly but we don't have input readers
coupled. The data is generated line by line, then directed to threads which
append lines to the files.  Originally, I started with threads not being
dedicated to files.  I've come across
https://issues.apache.org/jira/browse/HDFS-7203 and switched the
implementation to have a single thread executor service with a blocking
queue, one such thing per output file.  This was an attempt to work
around HDFS-7203 so multiple threads don't write to the same file but only
one dedicated thread does.

However, that approach only worked for a smaller data set. With larger data
sets, the "current leaseholder is trying to recreate file" errors came back.

The logic indeed is using the try-with-resources so presumably that should
be OK:

try (BufferedWriter bw = new BufferedWriter(new
OutputStreamWriter(outputFile.getOuputStream(true)))) {
    bw.write(line);
} catch (Exception ex) {
   // ... handle exception ...
}

I included the getOutputStream method in my previous email; including it
below.

Basically, having a single thread dedicated service with a queue didn't
work for me.  I'm trying to have just a single thread handling all appends
now, inefficient as that may be.  We currently have no path to upgrade to
Hadoop 2.6.0 to see if we get better luck with HDFS-7203 having been
fixed...

*org.apache.hadoop.fs.path.Path file = ...*

*public* OutputStream getOutputStream(*boolean* append) *throws* IOException
{

    OutputStream os = *null*;

    *synchronized* (file) {

      // If the file exists

      *if* (isFile()) {

        // See if we're to append or to overwrite

        os = (append)

          ? fs.append(file) : fs.create(file, *true*);

      }

      // Appending to a non-existent file

      *else* *if* (append) {

        // Create the file first

        // otherwise, "failed to append to non-existent file" exception

        FSDataOutputStream dos = fs.create(file);

        dos.close();

        // Open it for appending

        os = fs.append(file);

      }

      // Creating a new file

      *else* {

        os = fs.create(file);

      }

    }

    *return* os;

  }


On Tue, May 31, 2016 at 9:10 AM, John Lilley <jo...@redpoint.net>
wrote:

> Dmitry,
>
>
>
> Regarding your RemoteException issue, you should check that your output
> files are explicitly closed by calling the close() method, otherwise they
> are only closed when the GC gets around to finalizing.  If you can use
> try-with-resources, that is best.  I’ve seen these kinds of issues when one
> writer still has the file open and another writer attempts to append.
>
>
>
> More generally, I recommend keeping the same writers open for longer
> times, especially if the writes tend to be small.  If corresponding readers
> need to see the appended data quickly (I have not tried this myself) the
> FSDataOutputStream.hflush() method is documented to make pending data
> available to readers.
>
>
>
> You should rethink your design here:
>
> “make sure the list of writers doesn't grow unbounded (currently, it's one
> writer per each input file processed by the pipeliner)”
>
> This doesn’t sound like a good design, coupling your input readers
> directly with output writers.  Instead, put the writers in separate threads
> and push byte arrays to be written to them via a queue.
>
>
>
> *John Lilley*
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg123@gmail.com]
> *Sent:* Wednesday, May 25, 2016 9:12 PM
> *To:* user@hadoop.apache.org
> *Subject:* What is the recommended way to append to files on hdfs?
>
>
>
> I'm having trouble figuring out a safe way to append to files in HDFS.
>
> I'm using a small, 3-node Hadoop cluster (CDH v.5.3.9 to be specific). Our
> process is a data pipeliner which is multi-threaded (8 threads) and it has
> a stage which appends lines of delimited text to files in a dedicated
> directory on HDFS. I'm using locks to synchronize access of the threads to
> the buffered writers which append the data.
>
> My first issue is deciding on the approach generally.
>
> Approach A is to open the file, append to it, then close it for every line
> appended. This seems slow and would seem to create too many small blocks,
> or at least I see some such sentiment in various posts.
>
> Approach B is to cache the writers but periodically refresh them to make
> sure the list of writers doesn't grow unbounded (currently, it's one writer
> per each input file processed by the pipeliner). This seems like a more
> efficient approach but I imagine having open streams over a period of time
> however controlled may be an issue, especially for output file readers (?)
>
> Beyond this, my real issues are two. I am using the FileSystem Java Hadoop
> API to do the appending and am intermittently getting these 2 types of
> exceptions:
>
> org.apache.hadoop.ipc.RemoteException: failed to create file
> /output/acme_20160524_1.txt for DFSClient_NONMAPREDUCE_271210261_1 for
> client XXX.XX.XXX.XX because current leaseholder is trying to recreate file.
>
> org.apache.hadoop.ipc.RemoteException:
> BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540 does not
> exist or is not under Constructionblk_1073760252_545
> 40{blockUCState=UNDER_RECOVERY, primaryNodeIndex=1,
> replicas=[ReplicaUnderConstruction[[DISK]DS-ccdf4e55-234b-4e17-955f-daaed1afdd92:NORMAL|RBW],
> ReplicaUnderConst
> ruction[[DISK]DS-1f66db61-759f-4c5d-bb3b-f78c260e338f:NORMAL|RBW]]}
>
> Anyone have any ideas on either of those?
>
> For the first problem, I've tried instrumenting logic discussed in this
> post
> <http://stackoverflow.com/questions/23833318/crashed-hdfs-client-how-to-close-remaining-open-files> but
> didn't seem to help.
>
> I'm also interested in the role of the dfs.support.append property, if at
> all applicable.
>
> The code is more or less as follows, for getting the output stream
>
> userGroupInfo = UserGroupInformation.*createRemoteUser*("hdfs");
>
> Configuration conf = new Configuration();
>
> conf.set(key1, val1);
>
> ....
>
> conf.set(keyN, valN);
>
> fileSystem = userGroupInfo.doAs(*new* PrivilegedExceptionAction<FileSystem>()
> {
>
>         *public* FileSystem run() *throws* Exception {
>
>             *return* FileSystem.*get*(conf);
>
>         }
>
>       });
>
> *org.apache.hadoop.fs.path.Path file = ...*
>
> *public* OutputStream getOutputStream(*boolean* append) *throws* IOException
> {
>
>     OutputStream os = *null*;
>
>     *synchronized* (file) {
>
>       // If the file exists
>
>       *if* (isFile()) {
>
>         // See if we're to append or to overwrite
>
>         os = (append)
>
>           ? fs.append(file) : fs.create(file, *true*);
>
>       }
>
>       // Appending to a non-existent file
>
>       *else* *if* (append) {
>
>         // Create the file first
>
>         // otherwise, "failed to append to non-existent file" exception
>
>         FSDataOutputStream dos = fs.create(file);
>
>         dos.close();
>
>         // Open it for appending
>
>         os = fs.append(file);
>
>       }
>
>       // Creating a new file
>
>       *else* {
>
>         os = fs.create(file);
>
>       }
>
>     }
>
>     *return* os;
>
>   }
>
>
>
>
>
>
>

Re: What is the recommended way to append to files on hdfs?

Posted by Dmitry Goldenberg <dg...@gmail.com>.
I got file appending working with CDH 5.3 / HDFS 2.5.0. My conclusions so
far are as follows:

   - Cannot have one dedicated thread doing appends per file, or multiple
   threads writing to multiple files, whether we’re writing data via one and
   the same instance of the HDFS API FileSystem, or different instances.
   - Cannot refresh (i.e. close and reopen) the writers; they must stay
   open.
   - This last item leads to occasional relatively rare
   ClosedChannelException which appears to be recoverable (by retrying to
   append).
   - We use a single thread executor service with a blocking queue (one for
   appending to all files); a writer per file, the writers stay open (till the
   end of processing when they’re closed).
   - When we upgrade to CDH newer than 5.3, we’ll want to revisit this and
   see what threading strategy makes sense: one and only thread, one thread
   per file, multiple threads writing to multiple files. Additionally, we’ll
   want to see if writers can be/need to be periodically closed and reopened.

In addition, I have seen the following error as well, and was able to make
it go away by setting
'dfs.client.block.write.replace-datanode-on-failure.policy'
to 'NEVER' on the client side.

java.io.IOException: Failed to replace a bad datanode on the existing
pipeline due to no more good datanodes being available to try. (Nodes:
current=[XXX.XX.XXX.XX:5 <http://168.72.192.72:5/>
0010, XXX.XX.XXX.XX:50010 <http://168.72.192.73:50010/>], original=[XXX
.XX.XXX.XX:50010 <http://168.72.192.73:50010/>, XXX.XX.XXX.XX:50010]). The
current failed datanode replacement policy is DEFAULT, and a client may
configure this via
'dfs.client.block.write.replace-datanode-on-failure.policy' in its
configuration.
       at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:969)
~[hadoop-hdfs-2.5.0.jar:?]
       at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1035)
~[hadoop-hdfs-2.5.0.jar:?]
       at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1184)
~[hadoop-hdfs-2.5.0.jar:?]
       at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:532)
~[hadoop-hdfs-2.5.0.jar:?]

On Tue, May 31, 2016 at 9:10 AM, John Lilley <jo...@redpoint.net>
wrote:

> Dmitry,
>
>
>
> Regarding your RemoteException issue, you should check that your output
> files are explicitly closed by calling the close() method, otherwise they
> are only closed when the GC gets around to finalizing.  If you can use
> try-with-resources, that is best.  I’ve seen these kinds of issues when one
> writer still has the file open and another writer attempts to append.
>
>
>
> More generally, I recommend keeping the same writers open for longer
> times, especially if the writes tend to be small.  If corresponding readers
> need to see the appended data quickly (I have not tried this myself) the
> FSDataOutputStream.hflush() method is documented to make pending data
> available to readers.
>
>
>
> You should rethink your design here:
>
> “make sure the list of writers doesn't grow unbounded (currently, it's one
> writer per each input file processed by the pipeliner)”
>
> This doesn’t sound like a good design, coupling your input readers
> directly with output writers.  Instead, put the writers in separate threads
> and push byte arrays to be written to them via a queue.
>
>
>
> *John Lilley*
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg123@gmail.com]
> *Sent:* Wednesday, May 25, 2016 9:12 PM
> *To:* user@hadoop.apache.org
> *Subject:* What is the recommended way to append to files on hdfs?
>
>
>
> I'm having trouble figuring out a safe way to append to files in HDFS.
>
> I'm using a small, 3-node Hadoop cluster (CDH v.5.3.9 to be specific). Our
> process is a data pipeliner which is multi-threaded (8 threads) and it has
> a stage which appends lines of delimited text to files in a dedicated
> directory on HDFS. I'm using locks to synchronize access of the threads to
> the buffered writers which append the data.
>
> My first issue is deciding on the approach generally.
>
> Approach A is to open the file, append to it, then close it for every line
> appended. This seems slow and would seem to create too many small blocks,
> or at least I see some such sentiment in various posts.
>
> Approach B is to cache the writers but periodically refresh them to make
> sure the list of writers doesn't grow unbounded (currently, it's one writer
> per each input file processed by the pipeliner). This seems like a more
> efficient approach but I imagine having open streams over a period of time
> however controlled may be an issue, especially for output file readers (?)
>
> Beyond this, my real issues are two. I am using the FileSystem Java Hadoop
> API to do the appending and am intermittently getting these 2 types of
> exceptions:
>
> org.apache.hadoop.ipc.RemoteException: failed to create file
> /output/acme_20160524_1.txt for DFSClient_NONMAPREDUCE_271210261_1 for
> client XXX.XX.XXX.XX because current leaseholder is trying to recreate file.
>
> org.apache.hadoop.ipc.RemoteException:
> BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540 does not
> exist or is not under Constructionblk_1073760252_545
> 40{blockUCState=UNDER_RECOVERY, primaryNodeIndex=1,
> replicas=[ReplicaUnderConstruction[[DISK]DS-ccdf4e55-234b-4e17-955f-daaed1afdd92:NORMAL|RBW],
> ReplicaUnderConst
> ruction[[DISK]DS-1f66db61-759f-4c5d-bb3b-f78c260e338f:NORMAL|RBW]]}
>
> Anyone have any ideas on either of those?
>
> For the first problem, I've tried instrumenting logic discussed in this
> post
> <http://stackoverflow.com/questions/23833318/crashed-hdfs-client-how-to-close-remaining-open-files> but
> didn't seem to help.
>
> I'm also interested in the role of the dfs.support.append property, if at
> all applicable.
>
> The code is more or less as follows, for getting the output stream
>
> userGroupInfo = UserGroupInformation.*createRemoteUser*("hdfs");
>
> Configuration conf = new Configuration();
>
> conf.set(key1, val1);
>
> ....
>
> conf.set(keyN, valN);
>
> fileSystem = userGroupInfo.doAs(*new* PrivilegedExceptionAction<FileSystem>()
> {
>
>         *public* FileSystem run() *throws* Exception {
>
>             *return* FileSystem.*get*(conf);
>
>         }
>
>       });
>
> *org.apache.hadoop.fs.path.Path file = ...*
>
> *public* OutputStream getOutputStream(*boolean* append) *throws* IOException
> {
>
>     OutputStream os = *null*;
>
>     *synchronized* (file) {
>
>       // If the file exists
>
>       *if* (isFile()) {
>
>         // See if we're to append or to overwrite
>
>         os = (append)
>
>           ? fs.append(file) : fs.create(file, *true*);
>
>       }
>
>       // Appending to a non-existent file
>
>       *else* *if* (append) {
>
>         // Create the file first
>
>         // otherwise, "failed to append to non-existent file" exception
>
>         FSDataOutputStream dos = fs.create(file);
>
>         dos.close();
>
>         // Open it for appending
>
>         os = fs.append(file);
>
>       }
>
>       // Creating a new file
>
>       *else* {
>
>         os = fs.create(file);
>
>       }
>
>     }
>
>     *return* os;
>
>   }
>
>
>
>
>
>
>

RE: What is the recommended way to append to files on hdfs?

Posted by John Lilley <jo...@redpoint.net>.
Dmitry,

Regarding your RemoteException issue, you should check that your output files are explicitly closed by calling the close() method, otherwise they are only closed when the GC gets around to finalizing.  If you can use try-with-resources, that is best.  I’ve seen these kinds of issues when one writer still has the file open and another writer attempts to append.

More generally, I recommend keeping the same writers open for longer times, especially if the writes tend to be small.  If corresponding readers need to see the appended data quickly (I have not tried this myself) the FSDataOutputStream.hflush() method is documented to make pending data available to readers.

You should rethink your design here:
“make sure the list of writers doesn't grow unbounded (currently, it's one writer per each input file processed by the pipeliner)”
This doesn’t sound like a good design, coupling your input readers directly with output writers.  Instead, put the writers in separate threads and push byte arrays to be written to them via a queue.

John Lilley

From: Dmitry Goldenberg [mailto:dgoldenberg123@gmail.com]
Sent: Wednesday, May 25, 2016 9:12 PM
To: user@hadoop.apache.org
Subject: What is the recommended way to append to files on hdfs?


I'm having trouble figuring out a safe way to append to files in HDFS.

I'm using a small, 3-node Hadoop cluster (CDH v.5.3.9 to be specific). Our process is a data pipeliner which is multi-threaded (8 threads) and it has a stage which appends lines of delimited text to files in a dedicated directory on HDFS. I'm using locks to synchronize access of the threads to the buffered writers which append the data.

My first issue is deciding on the approach generally.

Approach A is to open the file, append to it, then close it for every line appended. This seems slow and would seem to create too many small blocks, or at least I see some such sentiment in various posts.

Approach B is to cache the writers but periodically refresh them to make sure the list of writers doesn't grow unbounded (currently, it's one writer per each input file processed by the pipeliner). This seems like a more efficient approach but I imagine having open streams over a period of time however controlled may be an issue, especially for output file readers (?)

Beyond this, my real issues are two. I am using the FileSystem Java Hadoop API to do the appending and am intermittently getting these 2 types of exceptions:

org.apache.hadoop.ipc.RemoteException: failed to create file /output/acme_20160524_1.txt for DFSClient_NONMAPREDUCE_271210261_1 for client XXX.XX.XXX.XX because current leaseholder is trying to recreate file.

org.apache.hadoop.ipc.RemoteException: BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540 does not exist or is not under Constructionblk_1073760252_545 40{blockUCState=UNDER_RECOVERY, primaryNodeIndex=1, replicas=[ReplicaUnderConstruction[[DISK]DS-ccdf4e55-234b-4e17-955f-daaed1afdd92:NORMAL|RBW], ReplicaUnderConst ruction[[DISK]DS-1f66db61-759f-4c5d-bb3b-f78c260e338f:NORMAL|RBW]]}

Anyone have any ideas on either of those?

For the first problem, I've tried instrumenting logic discussed in this post<http://stackoverflow.com/questions/23833318/crashed-hdfs-client-how-to-close-remaining-open-files> but didn't seem to help.

I'm also interested in the role of the dfs.support.append property, if at all applicable.

The code is more or less as follows, for getting the output stream
userGroupInfo = UserGroupInformation.createRemoteUser("hdfs");
Configuration conf = new Configuration();
conf.set(key1, val1);
....
conf.set(keyN, valN);
fileSystem = userGroupInfo.doAs(new PrivilegedExceptionAction<FileSystem>() {
        public FileSystem run() throws Exception {
            return FileSystem.get(conf);
        }
      });
org.apache.hadoop.fs.path.Path file = ...
public OutputStream getOutputStream(boolean append) throws IOException {
    OutputStream os = null;
    synchronized (file) {
      // If the file exists
      if (isFile()) {
        // See if we're to append or to overwrite
        os = (append)
          ? fs.append(file) : fs.create(file, true);
      }
      // Appending to a non-existent file
      else if (append) {
        // Create the file first
        // otherwise, "failed to append to non-existent file" exception
        FSDataOutputStream dos = fs.create(file);
        dos.close();
        // Open it for appending
        os = fs.append(file);
      }
      // Creating a new file
      else {
        os = fs.create(file);
      }
    }
    return os;
  }