You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Luke Cwik (JIRA)" <ji...@apache.org> on 2017/06/14 15:46:02 UTC
[jira] [Created] (BEAM-2448) FileBasedSink writing to incorrect
path when path prefixe has no file component in path (e.g. /tmp/)
Luke Cwik created BEAM-2448:
-------------------------------
Summary: FileBasedSink writing to incorrect path when path prefixe has no file component in path (e.g. /tmp/)
Key: BEAM-2448
URL: https://issues.apache.org/jira/browse/BEAM-2448
Project: Beam
Issue Type: Bug
Components: sdk-java-core
Reporter: Luke Cwik
Priority: Minor
This pipeline (where WindowedFilenamePolicy is the one found in org.apache.beam.sdk.io.AvroIOTest) produces files in the wrong directory:
{code}
stringsPCollection.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))))
.apply("WritingToOutput",
TextIO.write().withWindowedWrites().withFilenamePolicy(new
WindowedFilenamePolicy("my_pref")).to("/tmp/").withNumShards(1));
{code}
I expected the output files to be written to /tmp/ directory but they are not. They are written to root directory which is unexpected behavior.
I think the problem is org.apache.beam.sdk.io.FileBasedSink.ExtractDirectory ...
This main method shows the problem:
{code}
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
public class Test {
public static void main(String[] args) {
ResourceId res = FileBasedSink.convertToFileResourceIfPossible("/tmp/");
System.out.println("Resource is " + res + ", current directory is " +
res.getCurrentDirectory() + ", filename is " + res.getFilename());
FileBasedSink<String> mockFBS = new
FileBasedSink<String>(StaticValueProvider.of(res), null) {
@Override
public org.apache.beam.sdk.io.FileBasedSink.WriteOperation<String>
createWriteOperation() {
return null;
}
};
final ValueProvider<ResourceId> provider =
mockFBS.getBaseOutputDirectoryProvider();
System.out.println("BaseOutputProvider is " + provider + ",
isAccessible=" + provider.isAccessible() + ", getValue=" + provider.get());
}
}
{code}
The output is
{code}
Resource is /tmp, current directory is //, filename is tmp
BaseOutputProvider is
NestedValueProvider{value=StaticValueProvider{value=/tmp}},
isAccessible=true, getValue=//
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)