You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/06/13 10:41:30 UTC

[incubator-wayang] 02/03: [WAYANG-FLINK] Correction in the TextFileSink in Flink

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch benchmark
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 15ab0ffa69061b5b2cfcb47690fc81ad0733450b
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Tue May 17 11:22:09 2022 +0200

    [WAYANG-FLINK] Correction in the TextFileSink in Flink
    
    Signed-off-by: bertty <be...@apache.org>
---
 .../java/org/apache/wayang/flink/operators/FlinkTextFileSink.java  | 7 ++++---
 .../code/main/resources/wayang-spark-defaults.properties           | 2 +-
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkTextFileSink.java b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkTextFileSink.java
index 96823444..5cc979f7 100644
--- a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkTextFileSink.java
+++ b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkTextFileSink.java
@@ -65,9 +65,10 @@ public class FlinkTextFileSink<Type> extends TextFileSink<Type> implements Flink
 
         DataSet<Type> inputDataset = ((DataSetChannel.Instance) inputs[0]).provideDataSet();
 
-        final TextOutputFormat.TextFormatter<Type> fileOutputFormat = flinkExecutor.getCompiler().compileOutput(this.formattingDescriptor);
-
-        inputDataset.writeAsFormattedText(this.textFileUrl, fileOutputFormat);
+        inputDataset.writeAsText(this.textFileUrl);
+//        final TextOutputFormat.TextFormatter<Type> fileOutputFormat = flinkExecutor.getCompiler().compileOutput(this.formattingDescriptor);
+//
+//        inputDataset.writeAsFormattedText(this.textFileUrl, fileOutputFormat);
 
         return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
     }
diff --git a/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-defaults.properties b/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-defaults.properties
index 7b4feb3f..262fcbe2 100644
--- a/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-defaults.properties
+++ b/wayang-platforms/wayang-spark/code/main/resources/wayang-spark-defaults.properties
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-spark.master = local[1]
+spark.master = local[*]
 spark.app.name = Wayang App
 spark.ui.showConsoleProgress = false
 spark.driver.allowMultipleContexts=true