You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/07/08 09:01:32 UTC

[incubator-wayang] branch multi-spark updated: [WAYANG-multi-plugin] modification and test for multiplugin

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch multi-spark
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git


The following commit(s) were added to refs/heads/multi-spark by this push:
     new 41ca31d2 [WAYANG-multi-plugin] modification and test for multiplugin
41ca31d2 is described below

commit 41ca31d20931642de545e44cc07a8dc1253070b5
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Fri Jul 8 11:01:25 2022 +0200

    [WAYANG-multi-plugin] modification and test for multiplugin
    
    Signed-off-by: bertty <be...@apache.org>
---
 .../org/apache/wayang/core/api/Configuration.java  |  5 +++
 .../core/plan/executionplan/ExecutionTask.java     |  2 +-
 .../wayang/spark/channels/ChannelConversions.java  |  2 +-
 .../wayang/spark/mapping/CartesianMapping.java     |  2 ++
 .../apache/wayang/spark/mapping/FilterMapping.java | 23 ++++++++++++--
 .../spark/mapping/LocalCallbackSinkMapping.java    | 23 ++++++++++++--
 .../org/apache/wayang/spark/mapping/Mappings.java  | 15 +++++++--
 .../spark/mapping/TextFileSourceMapping.java       | 22 +++++++++++--
 .../spark/operators/SparkExecutionOperator.java    |  3 ++
 .../spark/operators/SparkFilterOperator.java       | 16 ++++++++++
 .../spark/operators/SparkLocalCallbackSink.java    | 16 ++++++++++
 .../spark/operators/SparkTextFileSource.java       | 17 +++++++++-
 .../wayang/spark/platform/SparkPlatform.java       |  3 +-
 .../wayang/spark/plugin/SparkMultiPlugin.java      |  6 ++--
 .../org/apache/wayang/spark/test/DoubleSpark.java  | 14 ++++++---
 .../code/test/resources/log4j2.properties          | 36 ++++++++++++++++++++++
 .../code/test/resources/simplelogger.properties    | 25 +++++++++++++++
 17 files changed, 209 insertions(+), 21 deletions(-)

diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java
index 966ecc50..1c695a42 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java
@@ -682,6 +682,11 @@ public class Configuration {
     }
 
     public CollectionProvider<Class<PlanEnumerationPruningStrategy>> getPruningStrategyClassProvider() {
+        this.pruningStrategyClassProvider
+            .provideAll()
+            .stream()
+            .map(obj -> obj.getSimpleName())
+            .forEach(System.out::println);
         return this.pruningStrategyClassProvider;
     }
 
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/executionplan/ExecutionTask.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/executionplan/ExecutionTask.java
index 75e0cc0d..13872860 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/executionplan/ExecutionTask.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/executionplan/ExecutionTask.java
@@ -182,7 +182,7 @@ public class ExecutionTask {
 
     @Override
     public String toString() {
-        return "T[" + this.operator + ']';
+        return "T[" + this.operator+ " Platform: {"+this.stage.getPlatformExecution()+ "}]";
 //        return this.getClass().getSimpleName() + "[" + this.operator + ']';
     }
 
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
index 33422909..cc0daf33 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
@@ -126,7 +126,7 @@ public class ChannelConversions {
     }
 
     public static Collection<Supplier<ChannelConversion>> SUPPLIERS = Arrays.asList(
-            UNCACHED_RDD_TO_CACHED_RDD(),
+            //UNCACHED_RDD_TO_CACHED_RDD(),
             COLLECTION_TO_BROADCAST(),
             COLLECTION_TO_UNCACHED_RDD(),
             UNCACHED_RDD_TO_COLLECTION(),
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/CartesianMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/CartesianMapping.java
index 203de340..363998db 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/CartesianMapping.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/CartesianMapping.java
@@ -36,6 +36,8 @@ import java.util.Collections;
  */
 public class CartesianMapping implements Mapping {
 
+
+
     @Override
     public Collection<PlanTransformation> getTransformations() {
         return Collections.singleton(new PlanTransformation(
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/FilterMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/FilterMapping.java
index eeac994e..3ae8cfc7 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/FilterMapping.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/FilterMapping.java
@@ -25,6 +25,7 @@ import org.apache.wayang.core.mapping.OperatorPattern;
 import org.apache.wayang.core.mapping.PlanTransformation;
 import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
 import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.platform.Platform;
 import org.apache.wayang.core.types.DataSetType;
 import org.apache.wayang.spark.operators.SparkFilterOperator;
 import org.apache.wayang.spark.platform.SparkPlatform;
@@ -38,12 +39,30 @@ import java.util.Collections;
 @SuppressWarnings("unchecked")
 public class FilterMapping implements Mapping {
 
+    private String name;
+    private String conf;
+
+    public FilterMapping(){
+        this.conf = null;
+        this.name = null;
+    }
+
+    public FilterMapping(String name, String conf){
+        this.conf = conf;
+        this.name = name;
+    }
+
+    public SparkPlatform getPlatformInstance(){
+        return (this.name == null)?
+            SparkPlatform.getInstance():
+            SparkPlatform.getInstance(this.name, this.conf);
+    }
     @Override
     public Collection<PlanTransformation> getTransformations() {
         return Collections.singleton(new PlanTransformation(
                 this.createSubplanPattern(),
                 this.createReplacementSubplanFactory(),
-                SparkPlatform.getInstance()
+                this.getPlatformInstance()
         ));
     }
 
@@ -55,7 +74,7 @@ public class FilterMapping implements Mapping {
 
     private ReplacementSubplanFactory createReplacementSubplanFactory() {
         return new ReplacementSubplanFactory.OfSingleOperators<FilterOperator>(
-                (matchedOperator, epoch) -> new SparkFilterOperator<>(matchedOperator).at(epoch)
+                (matchedOperator, epoch) -> new SparkFilterOperator<>(matchedOperator).setPlatform(this.getPlatformInstance()).at(epoch)
         );
     }
 }
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/LocalCallbackSinkMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/LocalCallbackSinkMapping.java
index 301246ba..5d789030 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/LocalCallbackSinkMapping.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/LocalCallbackSinkMapping.java
@@ -36,12 +36,31 @@ import java.util.Collections;
  */
 public class LocalCallbackSinkMapping implements Mapping {
 
+    private String name;
+    private String conf;
+
+    public LocalCallbackSinkMapping(){
+        this.conf = null;
+        this.name = null;
+    }
+
+    public LocalCallbackSinkMapping(String name, String conf){
+        this.conf = conf;
+        this.name = name;
+    }
+
+    public SparkPlatform getPlatformInstance(){
+        return (this.name == null)?
+            SparkPlatform.getInstance():
+            SparkPlatform.getInstance(this.name, this.conf);
+    }
+
     @Override
     public Collection<PlanTransformation> getTransformations() {
         return Collections.singleton(new PlanTransformation(
                 this.createSubplanPattern(),
                 this.createReplacementSubplanFactory(),
-                SparkPlatform.getInstance()
+                this.getPlatformInstance()
         ));
     }
 
@@ -54,7 +73,7 @@ public class LocalCallbackSinkMapping implements Mapping {
 
     private ReplacementSubplanFactory createReplacementSubplanFactory() {
         return new ReplacementSubplanFactory.OfSingleOperators<LocalCallbackSink>(
-                (matchedOperator, epoch) -> new SparkLocalCallbackSink<>(matchedOperator).at(epoch)
+                (matchedOperator, epoch) -> new SparkLocalCallbackSink<>(matchedOperator).setPlatform(this.getPlatformInstance()).at(epoch)
         );
     }
 }
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
index 4e266405..4cd51e28 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
@@ -65,11 +65,22 @@ public class Mappings {
             new PageRankMapping()
     );
 
-    public static Collection<Mapping> getBasicMappings(){
+    public static Collection<Mapping> getBasicMappings(String name, String conf){
         return BASIC_MAPPINGS.stream()
                 .map(mapping -> {
                             try {
-                                return mapping.getClass().getConstructor().newInstance();
+                              Class<Mapping> clazz = (Class<Mapping>) mapping.getClass();
+                              if(
+                                  clazz.getSimpleName().contains("Filter") ||
+                                  clazz.getSimpleName().contains("LocalCallbackSink") ||
+                                  clazz.getSimpleName().contains("TextFileSource")
+                              ){
+                                return clazz.getConstructor(String.class, String.class).newInstance(
+                                    name, conf
+                                );
+                              }
+
+                              return clazz.getConstructor().newInstance();
                             } catch (InstantiationException e) {
                                 e.printStackTrace();
                             } catch (IllegalAccessException e) {
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/TextFileSourceMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/TextFileSourceMapping.java
index 09063e15..e78ed525 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/TextFileSourceMapping.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/TextFileSourceMapping.java
@@ -37,12 +37,30 @@ import java.util.Collections;
  */
 public class TextFileSourceMapping implements Mapping {
 
+    private String name;
+    private String conf;
+
+    public TextFileSourceMapping(){
+        this.conf = null;
+        this.name = null;
+    }
+
+    public TextFileSourceMapping(String name, String conf){
+        this.conf = conf;
+        this.name = name;
+    }
+
+    public SparkPlatform getPlatformInstance(){
+        return (this.name == null)?
+            SparkPlatform.getInstance():
+            SparkPlatform.getInstance(this.name, this.conf);
+    }
     @Override
     public Collection<PlanTransformation> getTransformations() {
         return Collections.singleton(new PlanTransformation(
                 this.createSubplanPattern(),
                 this.createReplacementSubplanFactory(),
-                SparkPlatform.getInstance()
+                this.getPlatformInstance()
         ));
     }
 
@@ -55,7 +73,7 @@ public class TextFileSourceMapping implements Mapping {
 
     private ReplacementSubplanFactory createReplacementSubplanFactory() {
         return new ReplacementSubplanFactory.OfSingleOperators<TextFileSource>(
-                (matchedOperator, epoch) -> new SparkTextFileSource(matchedOperator).at(epoch)
+                (matchedOperator, epoch) -> new SparkTextFileSource(matchedOperator).setPlatform(this.getPlatformInstance()).at(epoch)
         );
     }
 }
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkExecutionOperator.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkExecutionOperator.java
index 80fb5ab7..9910f7be 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkExecutionOperator.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkExecutionOperator.java
@@ -22,7 +22,9 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.plan.wayangplan.Operator;
 import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.Platform;
 import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
 import org.apache.wayang.core.util.Tuple;
 import org.apache.wayang.spark.execution.SparkExecutor;
@@ -39,6 +41,7 @@ public interface SparkExecutionOperator extends ExecutionOperator {
     default SparkPlatform getPlatform() {
         return SparkPlatform.getInstance();
     }
+    default Operator setPlatform(SparkPlatform sparkPlatform){return null;}
 
     /**
      * Evaluates this operator. Takes a set of {@link ChannelInstance}s according to the operator inputs and manipulates
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkFilterOperator.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkFilterOperator.java
index 737061e0..852aea8a 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkFilterOperator.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkFilterOperator.java
@@ -41,6 +41,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import org.apache.wayang.spark.platform.SparkPlatform;
 
 /**
  * Spark implementation of the {@link FilterOperator}.
@@ -49,6 +50,8 @@ public class SparkFilterOperator<Type>
         extends FilterOperator<Type>
         implements SparkExecutionOperator {
 
+    SparkPlatform platform;
+
     /**
      * Creates a new instance.
      *
@@ -120,4 +123,17 @@ public class SparkFilterOperator<Type>
         return false;
     }
 
+    @Override
+    public SparkPlatform getPlatform() {
+        if(this.platform == null) {
+            return SparkExecutionOperator.super.getPlatform();
+        }
+        return this.platform;
+    }
+
+    @Override
+    public SparkFilterOperator<Type> setPlatform(SparkPlatform sparkPlatform) {
+        this.platform = sparkPlatform;
+        return this;
+    }
 }
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkLocalCallbackSink.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkLocalCallbackSink.java
index c909d295..837a5b27 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkLocalCallbackSink.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkLocalCallbackSink.java
@@ -35,11 +35,14 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.function.Consumer;
+import org.apache.wayang.spark.platform.SparkPlatform;
 
 /**
  * Implementation of the {@link LocalCallbackSink} operator for the Spark platform.
  */
 public class SparkLocalCallbackSink<T extends Serializable> extends LocalCallbackSink<T> implements SparkExecutionOperator {
+
+    SparkPlatform platform;
     /**
      * Creates a new instance.
      *
@@ -101,4 +104,17 @@ public class SparkLocalCallbackSink<T extends Serializable> extends LocalCallbac
         return true;
     }
 
+    @Override
+    public SparkPlatform getPlatform() {
+        if(this.platform == null) {
+            return SparkExecutionOperator.super.getPlatform();
+        }
+        return this.platform;
+    }
+
+    @Override
+    public SparkLocalCallbackSink<T> setPlatform(SparkPlatform sparkPlatform) {
+        this.platform = sparkPlatform;
+        return this;
+    }
 }
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkTextFileSource.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkTextFileSource.java
index acf9f1d4..f5bff172 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkTextFileSource.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkTextFileSource.java
@@ -34,12 +34,13 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import org.apache.wayang.spark.platform.SparkPlatform;
 
 /**
  * Provides a {@link Collection} to a Spark job.
  */
 public class SparkTextFileSource extends TextFileSource implements SparkExecutionOperator {
-
+    SparkPlatform platform;
     public SparkTextFileSource(String inputUrl, String encoding) {
         super(inputUrl, encoding);
     }
@@ -109,4 +110,18 @@ public class SparkTextFileSource extends TextFileSource implements SparkExecutio
         return false;
     }
 
+
+    @Override
+    public SparkPlatform getPlatform() {
+        if(this.platform == null) {
+            return SparkExecutionOperator.super.getPlatform();
+        }
+        return this.platform;
+    }
+
+    @Override
+    public SparkTextFileSource setPlatform(SparkPlatform sparkPlatform) {
+        this.platform = sparkPlatform;
+        return this;
+    }
 }
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
index d3c7b791..f0b7442d 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
@@ -107,8 +107,6 @@ public class SparkPlatform extends Platform {
             return instances.get("default");
         } else {
             String first = instances.keySet().stream().findFirst().get();
-            System.out.println("first");
-            System.out.println(first);
             return instances.get(first);
         }
 
@@ -200,6 +198,7 @@ public class SparkPlatform extends Platform {
 
     @Override
     public void configureCustom(Configuration configuration, String config) {
+        System.out.println(config);
         configuration.load(ReflectionUtils.loadResource(config));
     }
 
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
index 180acdba..72725306 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
@@ -25,14 +25,14 @@ public class SparkMultiPlugin  implements Plugin {
 
     @Override
     public Collection<Mapping> getMappings() {
-        return Mappings.getBasicMappings();
+        return Mappings.getBasicMappings(this.name, this.config);
 //        return Mappings.BASIC_MAPPINGS;
     }
 
     @Override
     public Collection<ChannelConversion> getChannelConversions() {
-
-        return ChannelConversions.ALL;
+        System.out.println("here in getChannelConversiona");
+        return ChannelConversions.getALL();
     }
 
     @Override
diff --git a/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java b/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
index cf39e24c..2781b55d 100644
--- a/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
+++ b/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
@@ -7,6 +7,7 @@ import org.apache.wayang.core.api.WayangContext;
 import org.apache.wayang.core.plan.wayangplan.WayangPlan;
 import org.apache.wayang.core.types.DataSetType;
 import org.apache.wayang.core.util.ReflectionUtils;
+import org.apache.wayang.java.Java;
 import org.apache.wayang.java.platform.JavaPlatform;
 import org.apache.wayang.spark.Spark;
 
@@ -16,12 +17,14 @@ import java.util.List;
 public class DoubleSpark {
     public static void main(String[] args) {
         List<String> collector = new LinkedList<>();
-        TextFileSource textFileSource = new TextFileSource("file:///Users/rodrigopardomeza/files/demo.txt");
+        TextFileSource textFileSource = new TextFileSource("file:///Users/bertty/databloom/incubator-wayang/pom.xml");
         textFileSource.setName("Load file");
+        textFileSource.addTargetPlatform(Spark.platform("sparky", "wayang-sparky-default.properties"));
 
         FilterOperator<String> filterOperator = new FilterOperator<>(str -> !str.isEmpty(), String.class);
         filterOperator.setName("Filter empty words");
-        filterOperator.addTargetPlatform(Spark.platform("sparky", "wayang-sparky-default.properties"));
+       // filterOperator.addTargetPlatform(Spark.platform("sparky", "wayang-sparky-default.properties"));
+        filterOperator.addTargetPlatform(Java.platform());
 
         // write results to a sink
         LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(
@@ -29,15 +32,16 @@ public class DoubleSpark {
                 DataSetType.createDefaultUnchecked(String.class)
         );
         sink.setName("Collect result");
+        sink.addTargetPlatform(Spark.platform("other", "wayang-spark-defaults.properties"));
 
         // Build Rheem plan by connecting operators
         textFileSource.connectTo(0, filterOperator, 0);
         filterOperator.connectTo(0, sink, 0);
 
         WayangContext wayangContext = new WayangContext();
-        //wayangContext.register(Java.basicPlugin());
+        wayangContext.register(Java.basicPlugin());
         wayangContext.register(Spark.multiPlugin("sparky", "wayang-sparky-default.properties"));
-//        wayangContext.register(Spark.multiPlugin("other", "wayang-spark-defaults.properties"));
+        wayangContext.register(Spark.multiPlugin("other", "wayang-spark-defaults.properties"));
 //        wayangContext.register(Spark.basicPlugin());
 
         System.out.println("here");
@@ -45,6 +49,6 @@ public class DoubleSpark {
 
 //        collector.sort((t1 , t2) -> Integer.compare(t2.field1, t1.field1));
         System.out.printf("Found %d words:\n", collector.size());
-        collector.forEach(wc -> System.out.printf("%s\n", wc));
+       // collector.forEach(wc -> System.out.printf("%s\n", wc));
     }
 }
diff --git a/wayang-platforms/wayang-spark/code/test/resources/log4j2.properties b/wayang-platforms/wayang-spark/code/test/resources/log4j2.properties
new file mode 100644
index 00000000..cc70338a
--- /dev/null
+++ b/wayang-platforms/wayang-spark/code/test/resources/log4j2.properties
@@ -0,0 +1,36 @@
+status = warn
+
+appender.console.type = Console
+appender.console.name = LogToConsole
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
+
+#appender.file.type = File
+#appender.file.name = LogToFile
+#appender.file.fileName=logs/app.log
+#appender.file.layout.type=PatternLayout
+#appender.file.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
+
+# Rotate log file
+appender.rolling.type = RollingFile
+appender.rolling.name = LogToRollingFile
+appender.rolling.fileName = logs/app.log
+appender.rolling.filePattern = logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz
+appender.rolling.layout.type = PatternLayout
+appender.rolling.layout.pattern = %d %p %C{1.} [%t] %m%n
+appender.rolling.policies.type = Policies
+appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
+appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=10MB
+appender.rolling.strategy.type = DefaultRolloverStrategy
+appender.rolling.strategy.max = 10
+
+# Log to console and rolling file
+logger.app.name = com.mkyong
+logger.app.level = debug
+logger.app.additivity = false
+logger.app.appenderRef.rolling.ref = LogToRollingFile
+logger.app.appenderRef.console.ref = LogToConsole
+
+rootLogger.level = info
+rootLogger.appenderRef.stdout.ref = LogToConsole
\ No newline at end of file
diff --git a/wayang-platforms/wayang-spark/code/test/resources/simplelogger.properties b/wayang-platforms/wayang-spark/code/test/resources/simplelogger.properties
new file mode 100644
index 00000000..922aae45
--- /dev/null
+++ b/wayang-platforms/wayang-spark/code/test/resources/simplelogger.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.slf4j.simpleLogger.logFile = System.out
+org.slf4j.simpleLogger.defaultLogLevel = info
+org.slf4j.simpleLogger.showThreadName = false
+org.slf4j.simpleLogger.showShortName = true
+org.slf4j.simpleLogger.levelInBrackets = true
+org.slf4j.simpleLogger.log.org.apache = warn
+org.slf4j.simpleLogger.log.org.spark-project = warn