You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by automaticgiant <hu...@rackspace.com> on 2015/07/13 22:44:27 UTC

fileStream with old files

It's not as odd as it sounds. I want to ensure that long streaming job
outages can recover all the files that went into a directory while the job
was down.
I've looked at
http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
and
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
and
https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
, but all seem unhelpful.
I've tested combinations of the following:
 * fileStreams created with dumb accept-all filters
 * newFilesOnly true and false,
 * tweaking minRememberDuration to high and low values,
 * on hdfs or local directory.
The problem is that it will not read files in the directory from more than a
minute ago.
JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir,
LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
Also tried with having set:
context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
"1654564"); to big/small.

Are there known limitations of the onlyNewFiles=false? Am I doing something
wrong?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: fileStream with old files

Posted by Terry Hole <hu...@gmail.com>.
Hi, Hunter,

*What **behavior do you see with the HDFS? The local file system and HDFS
should have the same ** behavior.*

*Thanks!*
*- Terry*

Hunter Morgan <Hu...@rackspace.com>于2015年7月16日周四 上午2:04写道:

>  After moving the setting of the parameter to SparkConf initialization
> instead of after the context is already initialized, I have it operating
> reliably on local filesystem, but not on hdfs. Are there any differences in
> behavior between these two cases I should be aware of?
>
>
>
> I don’t usually mailinglist or exchange, so forgive me for my ignorance of
> whether this message will go horribly wrong due to formatting.
>
>
>
> I plan to port the following code to Hadoop FS API to generalize testing
> to understand actual behavior and ensure desired behavior.
>
> public static JavaDStream<String> textFileStreamIncludingExisting(JavaStreamingContext context, String path)
> {
>     return context.fileStream(path, LongWritable
>             .class, Text.class, TextInputFormat.class, v1 -> true, false).map(v1 -> v1._2.toString());
> }
>
>
>
> @Test
> public void testTextFileStreamIncludingExistingReadsOldFiles() throws
> Exception
> {
>     final Path testDir = Files.createTempDirectory("sparkTest");
>     final ArrayList<Path> tempFiles = new ArrayList();
>
>     // create 20 "old" files
>     final int testFileNumberLimit = 20;
>     for (int testFileNumber = 0; testFileNumber < testFileNumberLimit;
> testFileNumber++)
>     {
>         final Path testFile = Files.createTempFile(testDir, "testFile", ""
> );
>         tempFiles.add(testFile);
>         final FileWriter fileWriter = new FileWriter(testFile.toFile());
>         fileWriter.write("asdf");
>         fileWriter.flush();
>         fileWriter.close();
>         for (String eachAttribute : new String[]{"basic:lastAccessTime",
> "basic:lastModifiedTime",
>                 "basic:creationTime"})
>         { // set file dates 0 to 20 days ago
>             Files.setAttribute(testFile, eachAttribute, FileTime.from(
> Instant.now().minus(Duration.ofDays
>                     (testFileNumber))));
>         }
>     }
>
>     final SparkConf sparkConf = new SparkConf().setMaster("local[1]").
> setAppName("test");
>     sparkConf.set("spark.streaming.minRememberDuration", String.valueOf(
> Integer.MAX_VALUE));
>     final JavaStreamingContext context = new JavaStreamingContext(
> sparkConf, Durations.seconds(1));
>     final JavaDStream<String> input = SparkUtil.
> textFileStreamIncludingExisting(context, String.valueOf(testDir
>             .toUri()));
>     // count files read
>     final Accumulator<Integer> accumulator = context.sparkContext().
> accumulator(0);
>
>     // setup async wait
>     Semaphore done = new Semaphore(1);
>     done.acquire();
>     input.foreachRDD(new Function<JavaRDD<String>, Void>()
>     {
>         @Override
>         public Void call(JavaRDD<String> v1) throws Exception
>         {
>             if (v1.count() == 0)
>             {
>                 done.release();
>             }
>             accumulator.add((int) v1.count());
>             return null;
>         }
>     });
>     context.start();
>     // wait for completion or 20 sec
>     done.tryAcquire(20, TimeUnit.SECONDS);
>     context.stop();
>
>     assertThat(accumulator.value(), is(testFileNumberLimit));
>
>     for (Path eachTempFile : tempFiles)
>     {
>         Files.deleteIfExists(eachTempFile);
>     }
>     Files.deleteIfExists(testDir);
> }
>
>
>
>
>
> *From:* Tathagata Das [mailto:tdas@databricks.com]
> *Sent:* Wednesday, July 15, 2015 00:01
> *To:* Terry Hole
> *Cc:* Hunter Morgan; user@spark.apache.org
>
>
> *Subject:* Re: fileStream with old files
>
>
>
> It was added, but its not documented publicly. I am planning to change the
> name of the conf to spark.streaming.fileStream.minRememberDuration to make
> it easier to understand
>
>
>
> On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole <hu...@gmail.com> wrote:
>
>  A new configuration named *spark.streaming.minRememberDuration* was
> added since 1.2.1 to control the file stream input, the default value is *60
> seconds*, you can change this value to a large value to include older
> files (older than 1 minute)
>
>
>
> You can get the detail from this jira:
> https://issues.apache.org/jira/browse/SPARK-3276
>
>
>
> -Terry
>
>
>
> On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant <
> hunter.morgan@rackspace.com> wrote:
>
> It's not as odd as it sounds. I want to ensure that long streaming job
> outages can recover all the files that went into a directory while the job
> was down.
> I've looked at
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
> and
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
> and
>
> https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
> , but all seem unhelpful.
> I've tested combinations of the following:
>  * fileStreams created with dumb accept-all filters
>  * newFilesOnly true and false,
>  * tweaking minRememberDuration to high and low values,
>  * on hdfs or local directory.
> The problem is that it will not read files in the directory from more than
> a
> minute ago.
> JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir,
> LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
> Also tried with having set:
> context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
> "1654564"); to big/small.
>
> Are there known limitations of the onlyNewFiles=false? Am I doing something
> wrong?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
>
>
>

RE: fileStream with old files

Posted by Hunter Morgan <Hu...@rackspace.com>.
After moving the setting of the parameter to SparkConf initialization instead of after the context is already initialized, I have it operating reliably on local filesystem, but not on hdfs. Are there any differences in behavior between these two cases I should be aware of?

I don’t usually mailinglist or exchange, so forgive me for my ignorance of whether this message will go horribly wrong due to formatting.

I plan to port the following code to Hadoop FS API to generalize testing to understand actual behavior and ensure desired behavior.

public static JavaDStream<String> textFileStreamIncludingExisting(JavaStreamingContext context, String path)
{
    return context.fileStream(path, LongWritable
            .class, Text.class, TextInputFormat.class, v1 -> true, false).map(v1 -> v1._2.toString());
}

@Test
public void testTextFileStreamIncludingExistingReadsOldFiles() throws Exception
{
    final Path testDir = Files.createTempDirectory("sparkTest");
    final ArrayList<Path> tempFiles = new ArrayList();

    // create 20 "old" files
    final int testFileNumberLimit = 20;
    for (int testFileNumber = 0; testFileNumber < testFileNumberLimit; testFileNumber++)
    {
        final Path testFile = Files.createTempFile(testDir, "testFile", "");
        tempFiles.add(testFile);
        final FileWriter fileWriter = new FileWriter(testFile.toFile());
        fileWriter.write("asdf");
        fileWriter.flush();
        fileWriter.close();
        for (String eachAttribute : new String[]{"basic:lastAccessTime", "basic:lastModifiedTime",
                "basic:creationTime"})
        { // set file dates 0 to 20 days ago
            Files.setAttribute(testFile, eachAttribute, FileTime.from(Instant.now().minus(Duration.ofDays
                    (testFileNumber))));
        }
    }

    final SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("test");
    sparkConf.set("spark.streaming.minRememberDuration", String.valueOf(Integer.MAX_VALUE));
    final JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(1));
    final JavaDStream<String> input = SparkUtil.textFileStreamIncludingExisting(context, String.valueOf(testDir
            .toUri()));
    // count files read
    final Accumulator<Integer> accumulator = context.sparkContext().accumulator(0);

    // setup async wait
    Semaphore done = new Semaphore(1);
    done.acquire();
    input.foreachRDD(new Function<JavaRDD<String>, Void>()
    {
        @Override
        public Void call(JavaRDD<String> v1) throws Exception
        {
            if (v1.count() == 0)
            {
                done.release();
            }
            accumulator.add((int) v1.count());
            return null;
        }
    });
    context.start();
    // wait for completion or 20 sec
    done.tryAcquire(20, TimeUnit.SECONDS);
    context.stop();

    assertThat(accumulator.value(), is(testFileNumberLimit));

    for (Path eachTempFile : tempFiles)
    {
        Files.deleteIfExists(eachTempFile);
    }
    Files.deleteIfExists(testDir);
}


From: Tathagata Das [mailto:tdas@databricks.com]
Sent: Wednesday, July 15, 2015 00:01
To: Terry Hole
Cc: Hunter Morgan; user@spark.apache.org
Subject: Re: fileStream with old files

It was added, but its not documented publicly. I am planning to change the name of the conf to spark.streaming.fileStream.minRememberDuration to make it easier to understand

On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole <hu...@gmail.com>> wrote:
A new configuration named spark.streaming.minRememberDuration was added since 1.2.1 to control the file stream input, the default value is 60 seconds, you can change this value to a large value to include older files (older than 1 minute)

You can get the detail from this jira: https://issues.apache.org/jira/browse/SPARK-3276

-Terry

On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant <hu...@rackspace.com>> wrote:
It's not as odd as it sounds. I want to ensure that long streaming job
outages can recover all the files that went into a directory while the job
was down.
I've looked at
http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
and
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
and
https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
, but all seem unhelpful.
I've tested combinations of the following:
 * fileStreams created with dumb accept-all filters
 * newFilesOnly true and false,
 * tweaking minRememberDuration to high and low values,
 * on hdfs or local directory.
The problem is that it will not read files in the directory from more than a
minute ago.
JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir,
LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
Also tried with having set:
context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
"1654564"); to big/small.

Are there known limitations of the onlyNewFiles=false? Am I doing something
wrong?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org<ma...@spark.apache.org>



Re: fileStream with old files

Posted by Tathagata Das <td...@databricks.com>.
It was added, but its not documented publicly. I am planning to change the
name of the conf to spark.streaming.fileStream.minRememberDuration to make
it easier to understand

On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole <hu...@gmail.com> wrote:

> A new configuration named *spark.streaming.minRememberDuration* was added
> since 1.2.1 to control the file stream input, the default value is *60
> seconds*, you can change this value to a large value to include older
> files (older than 1 minute)
>
> You can get the detail from this jira:
> https://issues.apache.org/jira/browse/SPARK-3276
>
> -Terry
>
> On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant <
> hunter.morgan@rackspace.com> wrote:
>
>> It's not as odd as it sounds. I want to ensure that long streaming job
>> outages can recover all the files that went into a directory while the job
>> was down.
>> I've looked at
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
>> and
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
>> and
>>
>> https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
>> , but all seem unhelpful.
>> I've tested combinations of the following:
>>  * fileStreams created with dumb accept-all filters
>>  * newFilesOnly true and false,
>>  * tweaking minRememberDuration to high and low values,
>>  * on hdfs or local directory.
>> The problem is that it will not read files in the directory from more
>> than a
>> minute ago.
>> JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir,
>> LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
>> Also tried with having set:
>>
>> context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
>> "1654564"); to big/small.
>>
>> Are there known limitations of the onlyNewFiles=false? Am I doing
>> something
>> wrong?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: fileStream with old files

Posted by Terry Hole <hu...@gmail.com>.
A new configuration named *spark.streaming.minRememberDuration* was added
since 1.2.1 to control the file stream input, the default value is *60
seconds*, you can change this value to a large value to include older files
(older than 1 minute)

You can get the detail from this jira:
https://issues.apache.org/jira/browse/SPARK-3276

-Terry

On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant <hunter.morgan@rackspace.com
> wrote:

> It's not as odd as it sounds. I want to ensure that long streaming job
> outages can recover all the files that went into a directory while the job
> was down.
> I've looked at
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
> and
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
> and
>
> https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
> , but all seem unhelpful.
> I've tested combinations of the following:
>  * fileStreams created with dumb accept-all filters
>  * newFilesOnly true and false,
>  * tweaking minRememberDuration to high and low values,
>  * on hdfs or local directory.
> The problem is that it will not read files in the directory from more than
> a
> minute ago.
> JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir,
> LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
> Also tried with having set:
> context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
> "1654564"); to big/small.
>
> Are there known limitations of the onlyNewFiles=false? Am I doing something
> wrong?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>