You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by automaticgiant <hu...@rackspace.com> on 2015/07/13 22:44:27 UTC
fileStream with old files
It's not as odd as it sounds. I want to ensure that long streaming job
outages can recover all the files that went into a directory while the job
was down.
I've looked at
http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
and
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
and
https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
, but all seem unhelpful.
I've tested combinations of the following:
* fileStreams created with dumb accept-all filters
* newFilesOnly true and false,
* tweaking minRememberDuration to high and low values,
* on hdfs or local directory.
The problem is that it will not read files in the directory from more than a
minute ago.
JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir,
LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
Also tried with having set:
context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
"1654564"); to big/small.
Are there known limitations of the onlyNewFiles=false? Am I doing something
wrong?
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: fileStream with old files
Posted by Terry Hole <hu...@gmail.com>.
Hi, Hunter,
*What **behavior do you see with the HDFS? The local file system and HDFS
should have the same ** behavior.*
*Thanks!*
*- Terry*
Hunter Morgan <Hu...@rackspace.com>于2015年7月16日周四 上午2:04写道:
> After moving the setting of the parameter to SparkConf initialization
> instead of after the context is already initialized, I have it operating
> reliably on local filesystem, but not on hdfs. Are there any differences in
> behavior between these two cases I should be aware of?
>
>
>
> I don’t usually mailinglist or exchange, so forgive me for my ignorance of
> whether this message will go horribly wrong due to formatting.
>
>
>
> I plan to port the following code to Hadoop FS API to generalize testing
> to understand actual behavior and ensure desired behavior.
>
> public static JavaDStream<String> textFileStreamIncludingExisting(JavaStreamingContext context, String path)
> {
> return context.fileStream(path, LongWritable
> .class, Text.class, TextInputFormat.class, v1 -> true, false).map(v1 -> v1._2.toString());
> }
>
>
>
> @Test
> public void testTextFileStreamIncludingExistingReadsOldFiles() throws
> Exception
> {
> final Path testDir = Files.createTempDirectory("sparkTest");
> final ArrayList<Path> tempFiles = new ArrayList();
>
> // create 20 "old" files
> final int testFileNumberLimit = 20;
> for (int testFileNumber = 0; testFileNumber < testFileNumberLimit;
> testFileNumber++)
> {
> final Path testFile = Files.createTempFile(testDir, "testFile", ""
> );
> tempFiles.add(testFile);
> final FileWriter fileWriter = new FileWriter(testFile.toFile());
> fileWriter.write("asdf");
> fileWriter.flush();
> fileWriter.close();
> for (String eachAttribute : new String[]{"basic:lastAccessTime",
> "basic:lastModifiedTime",
> "basic:creationTime"})
> { // set file dates 0 to 20 days ago
> Files.setAttribute(testFile, eachAttribute, FileTime.from(
> Instant.now().minus(Duration.ofDays
> (testFileNumber))));
> }
> }
>
> final SparkConf sparkConf = new SparkConf().setMaster("local[1]").
> setAppName("test");
> sparkConf.set("spark.streaming.minRememberDuration", String.valueOf(
> Integer.MAX_VALUE));
> final JavaStreamingContext context = new JavaStreamingContext(
> sparkConf, Durations.seconds(1));
> final JavaDStream<String> input = SparkUtil.
> textFileStreamIncludingExisting(context, String.valueOf(testDir
> .toUri()));
> // count files read
> final Accumulator<Integer> accumulator = context.sparkContext().
> accumulator(0);
>
> // setup async wait
> Semaphore done = new Semaphore(1);
> done.acquire();
> input.foreachRDD(new Function<JavaRDD<String>, Void>()
> {
> @Override
> public Void call(JavaRDD<String> v1) throws Exception
> {
> if (v1.count() == 0)
> {
> done.release();
> }
> accumulator.add((int) v1.count());
> return null;
> }
> });
> context.start();
> // wait for completion or 20 sec
> done.tryAcquire(20, TimeUnit.SECONDS);
> context.stop();
>
> assertThat(accumulator.value(), is(testFileNumberLimit));
>
> for (Path eachTempFile : tempFiles)
> {
> Files.deleteIfExists(eachTempFile);
> }
> Files.deleteIfExists(testDir);
> }
>
>
>
>
>
> *From:* Tathagata Das [mailto:tdas@databricks.com]
> *Sent:* Wednesday, July 15, 2015 00:01
> *To:* Terry Hole
> *Cc:* Hunter Morgan; user@spark.apache.org
>
>
> *Subject:* Re: fileStream with old files
>
>
>
> It was added, but its not documented publicly. I am planning to change the
> name of the conf to spark.streaming.fileStream.minRememberDuration to make
> it easier to understand
>
>
>
> On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole <hu...@gmail.com> wrote:
>
> A new configuration named *spark.streaming.minRememberDuration* was
> added since 1.2.1 to control the file stream input, the default value is *60
> seconds*, you can change this value to a large value to include older
> files (older than 1 minute)
>
>
>
> You can get the detail from this jira:
> https://issues.apache.org/jira/browse/SPARK-3276
>
>
>
> -Terry
>
>
>
> On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant <
> hunter.morgan@rackspace.com> wrote:
>
> It's not as odd as it sounds. I want to ensure that long streaming job
> outages can recover all the files that went into a directory while the job
> was down.
> I've looked at
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
> and
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
> and
>
> https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
> , but all seem unhelpful.
> I've tested combinations of the following:
> * fileStreams created with dumb accept-all filters
> * newFilesOnly true and false,
> * tweaking minRememberDuration to high and low values,
> * on hdfs or local directory.
> The problem is that it will not read files in the directory from more than
> a
> minute ago.
> JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir,
> LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
> Also tried with having set:
> context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
> "1654564"); to big/small.
>
> Are there known limitations of the onlyNewFiles=false? Am I doing something
> wrong?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
>
>
>
RE: fileStream with old files
Posted by Hunter Morgan <Hu...@rackspace.com>.
After moving the setting of the parameter to SparkConf initialization instead of after the context is already initialized, I have it operating reliably on local filesystem, but not on hdfs. Are there any differences in behavior between these two cases I should be aware of?
I don’t usually mailinglist or exchange, so forgive me for my ignorance of whether this message will go horribly wrong due to formatting.
I plan to port the following code to Hadoop FS API to generalize testing to understand actual behavior and ensure desired behavior.
public static JavaDStream<String> textFileStreamIncludingExisting(JavaStreamingContext context, String path)
{
return context.fileStream(path, LongWritable
.class, Text.class, TextInputFormat.class, v1 -> true, false).map(v1 -> v1._2.toString());
}
@Test
public void testTextFileStreamIncludingExistingReadsOldFiles() throws Exception
{
final Path testDir = Files.createTempDirectory("sparkTest");
final ArrayList<Path> tempFiles = new ArrayList();
// create 20 "old" files
final int testFileNumberLimit = 20;
for (int testFileNumber = 0; testFileNumber < testFileNumberLimit; testFileNumber++)
{
final Path testFile = Files.createTempFile(testDir, "testFile", "");
tempFiles.add(testFile);
final FileWriter fileWriter = new FileWriter(testFile.toFile());
fileWriter.write("asdf");
fileWriter.flush();
fileWriter.close();
for (String eachAttribute : new String[]{"basic:lastAccessTime", "basic:lastModifiedTime",
"basic:creationTime"})
{ // set file dates 0 to 20 days ago
Files.setAttribute(testFile, eachAttribute, FileTime.from(Instant.now().minus(Duration.ofDays
(testFileNumber))));
}
}
final SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("test");
sparkConf.set("spark.streaming.minRememberDuration", String.valueOf(Integer.MAX_VALUE));
final JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(1));
final JavaDStream<String> input = SparkUtil.textFileStreamIncludingExisting(context, String.valueOf(testDir
.toUri()));
// count files read
final Accumulator<Integer> accumulator = context.sparkContext().accumulator(0);
// setup async wait
Semaphore done = new Semaphore(1);
done.acquire();
input.foreachRDD(new Function<JavaRDD<String>, Void>()
{
@Override
public Void call(JavaRDD<String> v1) throws Exception
{
if (v1.count() == 0)
{
done.release();
}
accumulator.add((int) v1.count());
return null;
}
});
context.start();
// wait for completion or 20 sec
done.tryAcquire(20, TimeUnit.SECONDS);
context.stop();
assertThat(accumulator.value(), is(testFileNumberLimit));
for (Path eachTempFile : tempFiles)
{
Files.deleteIfExists(eachTempFile);
}
Files.deleteIfExists(testDir);
}
From: Tathagata Das [mailto:tdas@databricks.com]
Sent: Wednesday, July 15, 2015 00:01
To: Terry Hole
Cc: Hunter Morgan; user@spark.apache.org
Subject: Re: fileStream with old files
It was added, but its not documented publicly. I am planning to change the name of the conf to spark.streaming.fileStream.minRememberDuration to make it easier to understand
On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole <hu...@gmail.com>> wrote:
A new configuration named spark.streaming.minRememberDuration was added since 1.2.1 to control the file stream input, the default value is 60 seconds, you can change this value to a large value to include older files (older than 1 minute)
You can get the detail from this jira: https://issues.apache.org/jira/browse/SPARK-3276
-Terry
On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant <hu...@rackspace.com>> wrote:
It's not as odd as it sounds. I want to ensure that long streaming job
outages can recover all the files that went into a directory while the job
was down.
I've looked at
http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
and
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
and
https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
, but all seem unhelpful.
I've tested combinations of the following:
* fileStreams created with dumb accept-all filters
* newFilesOnly true and false,
* tweaking minRememberDuration to high and low values,
* on hdfs or local directory.
The problem is that it will not read files in the directory from more than a
minute ago.
JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir,
LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
Also tried with having set:
context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
"1654564"); to big/small.
Are there known limitations of the onlyNewFiles=false? Am I doing something
wrong?
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org<ma...@spark.apache.org>
Re: fileStream with old files
Posted by Tathagata Das <td...@databricks.com>.
It was added, but its not documented publicly. I am planning to change the
name of the conf to spark.streaming.fileStream.minRememberDuration to make
it easier to understand
On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole <hu...@gmail.com> wrote:
> A new configuration named *spark.streaming.minRememberDuration* was added
> since 1.2.1 to control the file stream input, the default value is *60
> seconds*, you can change this value to a large value to include older
> files (older than 1 minute)
>
> You can get the detail from this jira:
> https://issues.apache.org/jira/browse/SPARK-3276
>
> -Terry
>
> On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant <
> hunter.morgan@rackspace.com> wrote:
>
>> It's not as odd as it sounds. I want to ensure that long streaming job
>> outages can recover all the files that went into a directory while the job
>> was down.
>> I've looked at
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
>> and
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
>> and
>>
>> https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
>> , but all seem unhelpful.
>> I've tested combinations of the following:
>> * fileStreams created with dumb accept-all filters
>> * newFilesOnly true and false,
>> * tweaking minRememberDuration to high and low values,
>> * on hdfs or local directory.
>> The problem is that it will not read files in the directory from more
>> than a
>> minute ago.
>> JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir,
>> LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
>> Also tried with having set:
>>
>> context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
>> "1654564"); to big/small.
>>
>> Are there known limitations of the onlyNewFiles=false? Am I doing
>> something
>> wrong?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>
Re: fileStream with old files
Posted by Terry Hole <hu...@gmail.com>.
A new configuration named *spark.streaming.minRememberDuration* was added
since 1.2.1 to control the file stream input, the default value is *60
seconds*, you can change this value to a large value to include older files
(older than 1 minute)
You can get the detail from this jira:
https://issues.apache.org/jira/browse/SPARK-3276
-Terry
On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant <hunter.morgan@rackspace.com
> wrote:
> It's not as odd as it sounds. I want to ensure that long streaming job
> outages can recover all the files that went into a directory while the job
> was down.
> I've looked at
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
> and
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
> and
>
> https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
> , but all seem unhelpful.
> I've tested combinations of the following:
> * fileStreams created with dumb accept-all filters
> * newFilesOnly true and false,
> * tweaking minRememberDuration to high and low values,
> * on hdfs or local directory.
> The problem is that it will not read files in the directory from more than
> a
> minute ago.
> JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir,
> LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
> Also tried with having set:
> context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
> "1654564"); to big/small.
>
> Are there known limitations of the onlyNewFiles=false? Am I doing something
> wrong?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>