You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/02/25 22:44:06 UTC
[1/2] incubator-apex-malhar git commit: APEXCORE-60 added iteration
demo
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 ca5ab1124 -> 8331f56da
APEXCORE-60 added iteration demo
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d13c6f77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d13c6f77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d13c6f77
Branch: refs/heads/devel-3
Commit: d13c6f77c5c1ec1dfc4387bb7b6d6ecb220831d5
Parents: 9c557fc
Author: David Yan <da...@datatorrent.com>
Authored: Thu Nov 19 13:20:38 2015 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Tue Feb 23 10:03:29 2016 -0800
----------------------------------------------------------------------
demos/iteration/pom.xml | 37 ++++
demos/iteration/src/assemble/appPackage.xml | 59 +++++++
.../demos/iteration/Application.java | 168 +++++++++++++++++++
.../demos/iteration/package-info.java | 22 +++
.../src/main/resources/META-INF/properties.xml | 44 +++++
.../demos/iteration/ApplicationTest.java | 86 ++++++++++
.../src/test/resources/log4j.properties | 40 +++++
demos/pom.xml | 1 +
8 files changed, 457 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/pom.xml
----------------------------------------------------------------------
diff --git a/demos/iteration/pom.xml b/demos/iteration/pom.xml
new file mode 100644
index 0000000..5891f42
--- /dev/null
+++ b/demos/iteration/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>iteration-demo</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar (incubating) Iteration Demo</name>
+ <description>DataTorrent demo applications that demonstrates the iteration feature.</description>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-demos</artifactId>
+ <version>3.4.0-incubating-SNAPSHOT</version>
+ </parent>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/iteration/src/assemble/appPackage.xml b/demos/iteration/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/demos/iteration/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
----------------------------------------------------------------------
diff --git a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
new file mode 100644
index 0000000..c0178d8
--- /dev/null
+++ b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.demos.iteration;
+
+import com.datatorrent.api.Context;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.DefaultDelayOperator;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+/**
+ * Iteration demo : <br>
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see the Fibonacci sequence, something like the
+ * following output on the console:
+ *
+ * <pre>
+ * 1
+ * 1
+ * 2
+ * 3
+ * 5
+ * 8
+ * 13
+ * 21
+ * 34
+ * 55
+ * ...
+ * </pre>
+ *
+ */
+@ApplicationAnnotation(name="IterationDemo")
+public class Application implements StreamingApplication
+{
+ private final static Logger LOG = LoggerFactory.getLogger(Application.class);
+ private String extraOutputFileName; // for unit test
+
+ public static class FibonacciOperator extends BaseOperator
+ {
+ public long currentNumber = 1;
+ private transient long tempNum;
+ public transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object tuple)
+ {
+ }
+ };
+ public transient DefaultInputPort<Long> input = new DefaultInputPort<Long>()
+ {
+ @Override
+ public void process(Long tuple)
+ {
+ tempNum = (currentNumber == 1) ? 1 : tuple;
+ }
+ };
+ public transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
+
+
+ @Override
+ public void endWindow()
+ {
+ output.emit(currentNumber);
+ currentNumber += tempNum;
+ if (currentNumber <= 0) {
+ currentNumber = 1;
+ }
+ }
+ }
+
+ public static class StdoutOperator extends BaseOperator
+ {
+ private String extraOutputFileName; // for unit test
+ private transient PrintStream extraOutputStream;
+ /**
+ * This is the input port which receives the tuples that will be written to stdout.
+ */
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ @Override
+ @SuppressWarnings("UseOfSystemOutOrSystemErr")
+ public void process(Object t)
+ {
+ String s = t.toString();
+ System.out.println(s);
+ if (extraOutputStream != null) {
+ extraOutputStream.println(s);
+ }
+ }
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ if (extraOutputFileName != null) {
+ try {
+ extraOutputStream = new PrintStream(new FileOutputStream(extraOutputFileName), true);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ extraOutputStream.close();
+ }
+
+ public void setExtraOutputFileName(String fileName)
+ {
+ this.extraOutputFileName = fileName;
+ }
+ }
+
+ public void setExtraOutputFileName(String fileName)
+ {
+ this.extraOutputFileName = fileName;
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+ FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
+ DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+ StdoutOperator console = new StdoutOperator();
+ console.setExtraOutputFileName(extraOutputFileName);
+ dag.addOperator("console", console);
+ dag.addStream("dummy_to_operator", rand.integer_data, fib.dummyInputPort);
+ dag.addStream("operator_to_delay", fib.output, opDelay.input, console.input);
+ dag.addStream("delay_to_operator", opDelay.output, fib.input);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java
----------------------------------------------------------------------
diff --git a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java
new file mode 100644
index 0000000..0d24638
--- /dev/null
+++ b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Iteration demonstration application.
+ */
+package com.datatorrent.demos.iteration;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/iteration/src/main/resources/META-INF/properties.xml b/demos/iteration/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..bf65e22
--- /dev/null
+++ b/demos/iteration/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <!-- Memory settings for all demos -->
+ <property>
+ <name>dt.attr.MASTER_MEMORY_MB</name>
+ <value>512</value>
+ </property>
+ <property>
+ <name>dt.attr.DEBUG</name>
+ <value>true</value>
+ <property>
+ <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
+ <value>128</value>
+ </property>
+ <property>
+ <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name>
+ <value>-Xmx128M</value>
+ </property>
+ <property>
+ <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
+ <value>128</value>
+ </property>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java b/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
new file mode 100644
index 0000000..7804fcd
--- /dev/null
+++ b/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.demos.iteration;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.api.LocalMode;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+
+
+/**
+ *
+ */
+public class ApplicationTest
+{
+ @Test
+ public void testIterationApp() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ Application app = new Application();
+ String outputFileName = "target/output.txt";
+ long timeout = 10 * 1000; // 10 seconds
+
+ new File(outputFileName).delete();
+ app.setExtraOutputFileName(outputFileName);
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ long startTime = System.currentTimeMillis();
+ do {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ex) {
+ break;
+ }
+ File file = new File(outputFileName);
+ if (file.length() > 50) {
+ break;
+ }
+ } while (System.currentTimeMillis() - startTime < timeout);
+
+ lc.shutdown();
+ try (BufferedReader br = new BufferedReader(new FileReader(outputFileName))) {
+ Assert.assertEquals("1", br.readLine());
+ Assert.assertEquals("1", br.readLine());
+ Assert.assertEquals("2", br.readLine());
+ Assert.assertEquals("3", br.readLine());
+ Assert.assertEquals("5", br.readLine());
+ Assert.assertEquals("8", br.readLine());
+ Assert.assertEquals("13", br.readLine());
+ Assert.assertEquals("21", br.readLine());
+ Assert.assertEquals("34", br.readLine());
+ Assert.assertEquals("55", br.readLine());
+ Assert.assertEquals("89", br.readLine());
+ Assert.assertEquals("144", br.readLine());
+ Assert.assertEquals("233", br.readLine());
+ Assert.assertEquals("377", br.readLine());
+ Assert.assertEquals("610", br.readLine());
+ Assert.assertEquals("987", br.readLine());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/iteration/src/test/resources/log4j.properties b/demos/iteration/src/test/resources/log4j.properties
new file mode 100644
index 0000000..451cff3
--- /dev/null
+++ b/demos/iteration/src/test/resources/log4j.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index e650ea2..032583a 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -184,6 +184,7 @@
<module>uniquecount</module>
<module>r</module>
<module>echoserver</module>
+ <module>iteration</module>
</modules>
<dependencies>
[2/2] incubator-apex-malhar git commit: Merge branch 'APEXCORE-60' of
https://github.com/davidyan74/incubator-apex-malhar into devel-3
Posted by th...@apache.org.
Merge branch 'APEXCORE-60' of https://github.com/davidyan74/incubator-apex-malhar into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/8331f56d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/8331f56d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/8331f56d
Branch: refs/heads/devel-3
Commit: 8331f56da75c7069746af1ac3388bf880bae9f39
Parents: ca5ab11 d13c6f7
Author: Thomas Weise <th...@datatorrent.com>
Authored: Thu Feb 25 13:43:02 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Thu Feb 25 13:43:02 2016 -0800
----------------------------------------------------------------------
demos/iteration/pom.xml | 37 ++++
demos/iteration/src/assemble/appPackage.xml | 59 +++++++
.../demos/iteration/Application.java | 168 +++++++++++++++++++
.../demos/iteration/package-info.java | 22 +++
.../src/main/resources/META-INF/properties.xml | 44 +++++
.../demos/iteration/ApplicationTest.java | 86 ++++++++++
.../src/test/resources/log4j.properties | 40 +++++
demos/pom.xml | 1 +
8 files changed, 457 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/8331f56d/demos/pom.xml
----------------------------------------------------------------------