You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/07/08 09:01:32 UTC
[incubator-wayang] branch multi-spark updated: [WAYANG-multi-plugin] modification and test for multiplugin
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch multi-spark
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
The following commit(s) were added to refs/heads/multi-spark by this push:
new 41ca31d2 [WAYANG-multi-plugin] modification and test for multiplugin
41ca31d2 is described below
commit 41ca31d20931642de545e44cc07a8dc1253070b5
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Fri Jul 8 11:01:25 2022 +0200
[WAYANG-multi-plugin] modification and test for multiplugin
Signed-off-by: bertty <be...@apache.org>
---
.../org/apache/wayang/core/api/Configuration.java | 5 +++
.../core/plan/executionplan/ExecutionTask.java | 2 +-
.../wayang/spark/channels/ChannelConversions.java | 2 +-
.../wayang/spark/mapping/CartesianMapping.java | 2 ++
.../apache/wayang/spark/mapping/FilterMapping.java | 23 ++++++++++++--
.../spark/mapping/LocalCallbackSinkMapping.java | 23 ++++++++++++--
.../org/apache/wayang/spark/mapping/Mappings.java | 15 +++++++--
.../spark/mapping/TextFileSourceMapping.java | 22 +++++++++++--
.../spark/operators/SparkExecutionOperator.java | 3 ++
.../spark/operators/SparkFilterOperator.java | 16 ++++++++++
.../spark/operators/SparkLocalCallbackSink.java | 16 ++++++++++
.../spark/operators/SparkTextFileSource.java | 17 +++++++++-
.../wayang/spark/platform/SparkPlatform.java | 3 +-
.../wayang/spark/plugin/SparkMultiPlugin.java | 6 ++--
.../org/apache/wayang/spark/test/DoubleSpark.java | 14 ++++++---
.../code/test/resources/log4j2.properties | 36 ++++++++++++++++++++++
.../code/test/resources/simplelogger.properties | 25 +++++++++++++++
17 files changed, 209 insertions(+), 21 deletions(-)
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java
index 966ecc50..1c695a42 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java
@@ -682,6 +682,11 @@ public class Configuration {
}
public CollectionProvider<Class<PlanEnumerationPruningStrategy>> getPruningStrategyClassProvider() {
+ this.pruningStrategyClassProvider
+ .provideAll()
+ .stream()
+ .map(obj -> obj.getSimpleName())
+ .forEach(System.out::println);
return this.pruningStrategyClassProvider;
}
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/executionplan/ExecutionTask.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/executionplan/ExecutionTask.java
index 75e0cc0d..13872860 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/executionplan/ExecutionTask.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/executionplan/ExecutionTask.java
@@ -182,7 +182,7 @@ public class ExecutionTask {
@Override
public String toString() {
- return "T[" + this.operator + ']';
+ return "T[" + this.operator+ " Platform: {"+this.stage.getPlatformExecution()+ "}]";
// return this.getClass().getSimpleName() + "[" + this.operator + ']';
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
index 33422909..cc0daf33 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
@@ -126,7 +126,7 @@ public class ChannelConversions {
}
public static Collection<Supplier<ChannelConversion>> SUPPLIERS = Arrays.asList(
- UNCACHED_RDD_TO_CACHED_RDD(),
+ //UNCACHED_RDD_TO_CACHED_RDD(),
COLLECTION_TO_BROADCAST(),
COLLECTION_TO_UNCACHED_RDD(),
UNCACHED_RDD_TO_COLLECTION(),
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/CartesianMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/CartesianMapping.java
index 203de340..363998db 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/CartesianMapping.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/CartesianMapping.java
@@ -36,6 +36,8 @@ import java.util.Collections;
*/
public class CartesianMapping implements Mapping {
+
+
@Override
public Collection<PlanTransformation> getTransformations() {
return Collections.singleton(new PlanTransformation(
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/FilterMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/FilterMapping.java
index eeac994e..3ae8cfc7 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/FilterMapping.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/FilterMapping.java
@@ -25,6 +25,7 @@ import org.apache.wayang.core.mapping.OperatorPattern;
import org.apache.wayang.core.mapping.PlanTransformation;
import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.spark.operators.SparkFilterOperator;
import org.apache.wayang.spark.platform.SparkPlatform;
@@ -38,12 +39,30 @@ import java.util.Collections;
@SuppressWarnings("unchecked")
public class FilterMapping implements Mapping {
+ private String name;
+ private String conf;
+
+ public FilterMapping(){
+ this.conf = null;
+ this.name = null;
+ }
+
+ public FilterMapping(String name, String conf){
+ this.conf = conf;
+ this.name = name;
+ }
+
+ public SparkPlatform getPlatformInstance(){
+ return (this.name == null)?
+ SparkPlatform.getInstance():
+ SparkPlatform.getInstance(this.name, this.conf);
+ }
@Override
public Collection<PlanTransformation> getTransformations() {
return Collections.singleton(new PlanTransformation(
this.createSubplanPattern(),
this.createReplacementSubplanFactory(),
- SparkPlatform.getInstance()
+ this.getPlatformInstance()
));
}
@@ -55,7 +74,7 @@ public class FilterMapping implements Mapping {
private ReplacementSubplanFactory createReplacementSubplanFactory() {
return new ReplacementSubplanFactory.OfSingleOperators<FilterOperator>(
- (matchedOperator, epoch) -> new SparkFilterOperator<>(matchedOperator).at(epoch)
+ (matchedOperator, epoch) -> new SparkFilterOperator<>(matchedOperator).setPlatform(this.getPlatformInstance()).at(epoch)
);
}
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/LocalCallbackSinkMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/LocalCallbackSinkMapping.java
index 301246ba..5d789030 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/LocalCallbackSinkMapping.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/LocalCallbackSinkMapping.java
@@ -36,12 +36,31 @@ import java.util.Collections;
*/
public class LocalCallbackSinkMapping implements Mapping {
+ private String name;
+ private String conf;
+
+ public LocalCallbackSinkMapping(){
+ this.conf = null;
+ this.name = null;
+ }
+
+ public LocalCallbackSinkMapping(String name, String conf){
+ this.conf = conf;
+ this.name = name;
+ }
+
+ public SparkPlatform getPlatformInstance(){
+ return (this.name == null)?
+ SparkPlatform.getInstance():
+ SparkPlatform.getInstance(this.name, this.conf);
+ }
+
@Override
public Collection<PlanTransformation> getTransformations() {
return Collections.singleton(new PlanTransformation(
this.createSubplanPattern(),
this.createReplacementSubplanFactory(),
- SparkPlatform.getInstance()
+ this.getPlatformInstance()
));
}
@@ -54,7 +73,7 @@ public class LocalCallbackSinkMapping implements Mapping {
private ReplacementSubplanFactory createReplacementSubplanFactory() {
return new ReplacementSubplanFactory.OfSingleOperators<LocalCallbackSink>(
- (matchedOperator, epoch) -> new SparkLocalCallbackSink<>(matchedOperator).at(epoch)
+ (matchedOperator, epoch) -> new SparkLocalCallbackSink<>(matchedOperator).setPlatform(this.getPlatformInstance()).at(epoch)
);
}
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
index 4e266405..4cd51e28 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
@@ -65,11 +65,22 @@ public class Mappings {
new PageRankMapping()
);
- public static Collection<Mapping> getBasicMappings(){
+ public static Collection<Mapping> getBasicMappings(String name, String conf){
return BASIC_MAPPINGS.stream()
.map(mapping -> {
try {
- return mapping.getClass().getConstructor().newInstance();
+ Class<Mapping> clazz = (Class<Mapping>) mapping.getClass();
+ if(
+ clazz.getSimpleName().contains("Filter") ||
+ clazz.getSimpleName().contains("LocalCallbackSink") ||
+ clazz.getSimpleName().contains("TextFileSource")
+ ){
+ return clazz.getConstructor(String.class, String.class).newInstance(
+ name, conf
+ );
+ }
+
+ return clazz.getConstructor().newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/TextFileSourceMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/TextFileSourceMapping.java
index 09063e15..e78ed525 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/TextFileSourceMapping.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/TextFileSourceMapping.java
@@ -37,12 +37,30 @@ import java.util.Collections;
*/
public class TextFileSourceMapping implements Mapping {
+ private String name;
+ private String conf;
+
+ public TextFileSourceMapping(){
+ this.conf = null;
+ this.name = null;
+ }
+
+ public TextFileSourceMapping(String name, String conf){
+ this.conf = conf;
+ this.name = name;
+ }
+
+ public SparkPlatform getPlatformInstance(){
+ return (this.name == null)?
+ SparkPlatform.getInstance():
+ SparkPlatform.getInstance(this.name, this.conf);
+ }
@Override
public Collection<PlanTransformation> getTransformations() {
return Collections.singleton(new PlanTransformation(
this.createSubplanPattern(),
this.createReplacementSubplanFactory(),
- SparkPlatform.getInstance()
+ this.getPlatformInstance()
));
}
@@ -55,7 +73,7 @@ public class TextFileSourceMapping implements Mapping {
private ReplacementSubplanFactory createReplacementSubplanFactory() {
return new ReplacementSubplanFactory.OfSingleOperators<TextFileSource>(
- (matchedOperator, epoch) -> new SparkTextFileSource(matchedOperator).at(epoch)
+ (matchedOperator, epoch) -> new SparkTextFileSource(matchedOperator).setPlatform(this.getPlatformInstance()).at(epoch)
);
}
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkExecutionOperator.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkExecutionOperator.java
index 80fb5ab7..9910f7be 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkExecutionOperator.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkExecutionOperator.java
@@ -22,7 +22,9 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.spark.execution.SparkExecutor;
@@ -39,6 +41,7 @@ public interface SparkExecutionOperator extends ExecutionOperator {
default SparkPlatform getPlatform() {
return SparkPlatform.getInstance();
}
+ default Operator setPlatform(SparkPlatform sparkPlatform){return null;}
/**
* Evaluates this operator. Takes a set of {@link ChannelInstance}s according to the operator inputs and manipulates
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkFilterOperator.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkFilterOperator.java
index 737061e0..852aea8a 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkFilterOperator.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkFilterOperator.java
@@ -41,6 +41,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import org.apache.wayang.spark.platform.SparkPlatform;
/**
* Spark implementation of the {@link FilterOperator}.
@@ -49,6 +50,8 @@ public class SparkFilterOperator<Type>
extends FilterOperator<Type>
implements SparkExecutionOperator {
+ SparkPlatform platform;
+
/**
* Creates a new instance.
*
@@ -120,4 +123,17 @@ public class SparkFilterOperator<Type>
return false;
}
+ @Override
+ public SparkPlatform getPlatform() {
+ if(this.platform == null) {
+ return SparkExecutionOperator.super.getPlatform();
+ }
+ return this.platform;
+ }
+
+ @Override
+ public SparkFilterOperator<Type> setPlatform(SparkPlatform sparkPlatform) {
+ this.platform = sparkPlatform;
+ return this;
+ }
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkLocalCallbackSink.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkLocalCallbackSink.java
index c909d295..837a5b27 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkLocalCallbackSink.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkLocalCallbackSink.java
@@ -35,11 +35,14 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
+import org.apache.wayang.spark.platform.SparkPlatform;
/**
* Implementation of the {@link LocalCallbackSink} operator for the Spark platform.
*/
public class SparkLocalCallbackSink<T extends Serializable> extends LocalCallbackSink<T> implements SparkExecutionOperator {
+
+ SparkPlatform platform;
/**
* Creates a new instance.
*
@@ -101,4 +104,17 @@ public class SparkLocalCallbackSink<T extends Serializable> extends LocalCallbac
return true;
}
+ @Override
+ public SparkPlatform getPlatform() {
+ if(this.platform == null) {
+ return SparkExecutionOperator.super.getPlatform();
+ }
+ return this.platform;
+ }
+
+ @Override
+ public SparkLocalCallbackSink<T> setPlatform(SparkPlatform sparkPlatform) {
+ this.platform = sparkPlatform;
+ return this;
+ }
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkTextFileSource.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkTextFileSource.java
index acf9f1d4..f5bff172 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkTextFileSource.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkTextFileSource.java
@@ -34,12 +34,13 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import org.apache.wayang.spark.platform.SparkPlatform;
/**
* Provides a {@link Collection} to a Spark job.
*/
public class SparkTextFileSource extends TextFileSource implements SparkExecutionOperator {
-
+ SparkPlatform platform;
public SparkTextFileSource(String inputUrl, String encoding) {
super(inputUrl, encoding);
}
@@ -109,4 +110,18 @@ public class SparkTextFileSource extends TextFileSource implements SparkExecutio
return false;
}
+
+ @Override
+ public SparkPlatform getPlatform() {
+ if(this.platform == null) {
+ return SparkExecutionOperator.super.getPlatform();
+ }
+ return this.platform;
+ }
+
+ @Override
+ public SparkTextFileSource setPlatform(SparkPlatform sparkPlatform) {
+ this.platform = sparkPlatform;
+ return this;
+ }
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
index d3c7b791..f0b7442d 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
@@ -107,8 +107,6 @@ public class SparkPlatform extends Platform {
return instances.get("default");
} else {
String first = instances.keySet().stream().findFirst().get();
- System.out.println("first");
- System.out.println(first);
return instances.get(first);
}
@@ -200,6 +198,7 @@ public class SparkPlatform extends Platform {
@Override
public void configureCustom(Configuration configuration, String config) {
+ System.out.println(config);
configuration.load(ReflectionUtils.loadResource(config));
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
index 180acdba..72725306 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
@@ -25,14 +25,14 @@ public class SparkMultiPlugin implements Plugin {
@Override
public Collection<Mapping> getMappings() {
- return Mappings.getBasicMappings();
+ return Mappings.getBasicMappings(this.name, this.config);
// return Mappings.BASIC_MAPPINGS;
}
@Override
public Collection<ChannelConversion> getChannelConversions() {
-
- return ChannelConversions.ALL;
+ System.out.println("here in getChannelConversiona");
+ return ChannelConversions.getALL();
}
@Override
diff --git a/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java b/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
index cf39e24c..2781b55d 100644
--- a/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
+++ b/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
@@ -7,6 +7,7 @@ import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.ReflectionUtils;
+import org.apache.wayang.java.Java;
import org.apache.wayang.java.platform.JavaPlatform;
import org.apache.wayang.spark.Spark;
@@ -16,12 +17,14 @@ import java.util.List;
public class DoubleSpark {
public static void main(String[] args) {
List<String> collector = new LinkedList<>();
- TextFileSource textFileSource = new TextFileSource("file:///Users/rodrigopardomeza/files/demo.txt");
+ TextFileSource textFileSource = new TextFileSource("file:///Users/bertty/databloom/incubator-wayang/pom.xml");
textFileSource.setName("Load file");
+ textFileSource.addTargetPlatform(Spark.platform("sparky", "wayang-sparky-default.properties"));
FilterOperator<String> filterOperator = new FilterOperator<>(str -> !str.isEmpty(), String.class);
filterOperator.setName("Filter empty words");
- filterOperator.addTargetPlatform(Spark.platform("sparky", "wayang-sparky-default.properties"));
+ // filterOperator.addTargetPlatform(Spark.platform("sparky", "wayang-sparky-default.properties"));
+ filterOperator.addTargetPlatform(Java.platform());
// write results to a sink
LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(
@@ -29,15 +32,16 @@ public class DoubleSpark {
DataSetType.createDefaultUnchecked(String.class)
);
sink.setName("Collect result");
+ sink.addTargetPlatform(Spark.platform("other", "wayang-spark-defaults.properties"));
// Build Rheem plan by connecting operators
textFileSource.connectTo(0, filterOperator, 0);
filterOperator.connectTo(0, sink, 0);
WayangContext wayangContext = new WayangContext();
- //wayangContext.register(Java.basicPlugin());
+ wayangContext.register(Java.basicPlugin());
wayangContext.register(Spark.multiPlugin("sparky", "wayang-sparky-default.properties"));
-// wayangContext.register(Spark.multiPlugin("other", "wayang-spark-defaults.properties"));
+ wayangContext.register(Spark.multiPlugin("other", "wayang-spark-defaults.properties"));
// wayangContext.register(Spark.basicPlugin());
System.out.println("here");
@@ -45,6 +49,6 @@ public class DoubleSpark {
// collector.sort((t1 , t2) -> Integer.compare(t2.field1, t1.field1));
System.out.printf("Found %d words:\n", collector.size());
- collector.forEach(wc -> System.out.printf("%s\n", wc));
+ // collector.forEach(wc -> System.out.printf("%s\n", wc));
}
}
diff --git a/wayang-platforms/wayang-spark/code/test/resources/log4j2.properties b/wayang-platforms/wayang-spark/code/test/resources/log4j2.properties
new file mode 100644
index 00000000..cc70338a
--- /dev/null
+++ b/wayang-platforms/wayang-spark/code/test/resources/log4j2.properties
@@ -0,0 +1,36 @@
+status = warn
+
+appender.console.type = Console
+appender.console.name = LogToConsole
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
+
+#appender.file.type = File
+#appender.file.name = LogToFile
+#appender.file.fileName=logs/app.log
+#appender.file.layout.type=PatternLayout
+#appender.file.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
+
+# Rotate log file
+appender.rolling.type = RollingFile
+appender.rolling.name = LogToRollingFile
+appender.rolling.fileName = logs/app.log
+appender.rolling.filePattern = logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz
+appender.rolling.layout.type = PatternLayout
+appender.rolling.layout.pattern = %d %p %C{1.} [%t] %m%n
+appender.rolling.policies.type = Policies
+appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
+appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=10MB
+appender.rolling.strategy.type = DefaultRolloverStrategy
+appender.rolling.strategy.max = 10
+
+# Log to console and rolling file
+logger.app.name = com.mkyong
+logger.app.level = debug
+logger.app.additivity = false
+logger.app.appenderRef.rolling.ref = LogToRollingFile
+logger.app.appenderRef.console.ref = LogToConsole
+
+rootLogger.level = info
+rootLogger.appenderRef.stdout.ref = LogToConsole
\ No newline at end of file
diff --git a/wayang-platforms/wayang-spark/code/test/resources/simplelogger.properties b/wayang-platforms/wayang-spark/code/test/resources/simplelogger.properties
new file mode 100644
index 00000000..922aae45
--- /dev/null
+++ b/wayang-platforms/wayang-spark/code/test/resources/simplelogger.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.slf4j.simpleLogger.logFile = System.out
+org.slf4j.simpleLogger.defaultLogLevel = info
+org.slf4j.simpleLogger.showThreadName = false
+org.slf4j.simpleLogger.showShortName = true
+org.slf4j.simpleLogger.levelInBrackets = true
+org.slf4j.simpleLogger.log.org.apache = warn
+org.slf4j.simpleLogger.log.org.spark-project = warn