You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/02/15 09:33:31 UTC
apex-malhar git commit: APEXMALHAR-2399 default constructor was
directly throwing an exception
Repository: apex-malhar
Updated Branches:
refs/heads/master f22b269a9 -> bc464ab55
APEXMALHAR-2399 default constructor was directly throwing an exception
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/bc464ab5
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/bc464ab5
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/bc464ab5
Branch: refs/heads/master
Commit: bc464ab55de19433a3b20b628a5f8b7cc5face1d
Parents: f22b269
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Mon Feb 6 17:49:03 2017 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Wed Feb 15 14:02:44 2017 +0530
----------------------------------------------------------------------
.../lib/window/accumulation/PojoInnerJoin.java | 2 +-
.../impl/PojoInnerJoinTestApplication.java | 417 +++++++++++++++++++
2 files changed, 418 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bc464ab5/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
index f9cf91e..9c9733e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java
@@ -42,7 +42,7 @@ public class PojoInnerJoin<InputT1, InputT2>
public PojoInnerJoin()
{
- throw new IllegalArgumentException("Please specify number of streams that are joining.");
+ // for kyro
}
public PojoInnerJoin(int num, String... keys)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bc464ab5/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
new file mode 100644
index 0000000..969e0fb
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import javax.validation.constraints.Min;
+
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.apex.malhar.lib.window.accumulation.PojoInnerJoin;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Example application to show usage of Windowed Merge Operator using PojoInnerJoin accumulation.
+ */
+public class PojoInnerJoinTestApplication implements StreamingApplication
+{
+ private static int records = 0;
+ private static int SalesCount = 0;
+ private static int ProductCount = 0;
+
+ public static class POJOGenerator implements InputOperator
+ {
+ @Min(1)
+ private int maxProductId = 1;
+ @Min(1)
+ private int maxCustomerId = 100000;
+ @Min(1)
+ int totalTuples;
+ private int maxProductCategories = 100;
+ private double maxAmount = 100.0;
+ private long tuplesCounter;
+ private long time;
+ private long timeIncrement;
+ private boolean isSalesEvent = true;
+
+ // Limit number of emitted tuples per window
+ @Min(0)
+ private long maxTuplesPerWindow = 100;
+ private final Random random = new Random();
+ public final transient DefaultOutputPort<Tuple.WindowedTuple<SalesEvent>> outputsales = new DefaultOutputPort<>();
+ public final transient DefaultOutputPort<Tuple.WindowedTuple<ProductEvent>> outputproduct = new DefaultOutputPort<>();
+ private static final long windowDuration = 1000;
+ private static WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new InMemoryWindowedStorage<>();
+ public final transient DefaultOutputPort<ControlTuple> watermarkDefaultOutputPort = new DefaultOutputPort<>();
+
+ private long watermarkTime;
+ private long startingTime;
+
+ public static Window.TimeWindow assignTestWindow(long timestamp)
+ {
+ long beginTimestamp = timestamp - timestamp % windowDuration;
+ Window.TimeWindow window = new Window.TimeWindow(beginTimestamp, windowDuration);
+ if (!windowStateMap.containsWindow(window)) {
+ windowStateMap.put(window, new WindowState());
+ }
+ return window;
+ }
+
+ public POJOGenerator(int maxProductId, int totalTuples )
+ {
+ this.maxProductId = maxProductId;
+ this.totalTuples = totalTuples;
+ }
+
+ public POJOGenerator()
+ {
+ //for kyro
+ }
+
+ @Override
+ public void beginWindow(long l)
+ {
+
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (tuplesCounter < totalTuples) {
+ watermarkDefaultOutputPort.emit(new WatermarkImpl(watermarkTime));
+ }
+ time += timeIncrement;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ //super.setup(context);
+ startingTime = System.currentTimeMillis();
+ watermarkTime = System.currentTimeMillis() + 10000;
+ tuplesCounter = 0;
+ time = System.currentTimeMillis();
+ timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+ context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+
+ SalesEvent generateSalesEvent() throws Exception
+ {
+ SalesEvent salesEvent = new SalesEvent();
+ salesEvent.productId = randomId(maxProductId);
+ salesEvent.customerId = randomId(maxCustomerId);
+ salesEvent.amount = randomAmount();
+ salesEvent.timestamps = time;
+ return salesEvent;
+ }
+
+ ProductEvent generateProductEvent() throws Exception
+ {
+ ProductEvent productEvent = new ProductEvent();
+ productEvent.productId = randomId(maxProductId);
+ productEvent.productCategory = randomId(maxProductCategories);
+ productEvent.timestamp = time;
+ return productEvent;
+ }
+
+ private int randomId(int max)
+ {
+ if (max < 1) {
+ return 1;
+ }
+ return 1 + random.nextInt(max);
+ }
+
+ private double randomAmount()
+ {
+ return maxAmount * random.nextDouble();
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ try {
+ while (tuplesCounter < totalTuples) {
+ //if (System.currentTimeMillis() - startingTime >= (i + 1) * 400) {
+ if (isSalesEvent) {
+ SalesEvent event = generateSalesEvent();
+ this.outputsales.emit(new Tuple.WindowedTuple<SalesEvent>(assignTestWindow(System.currentTimeMillis()), event));
+ SalesCount++;
+ } else {
+ ProductEvent event = generateProductEvent();
+ this.outputproduct.emit(new Tuple.WindowedTuple<ProductEvent>(assignTestWindow(System.currentTimeMillis()), event));
+ ProductCount++;
+ }
+ tuplesCounter++;
+ //}
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public static class SalesEvent
+ {
+ public int customerId;
+ public int productId;
+ public int productCategory;
+ public double amount;
+ public long timestamps;
+
+ public int getCustomerId()
+ {
+ return customerId;
+ }
+
+ public void setCustomerId(int customerId)
+ {
+ this.customerId = customerId;
+ }
+
+ public int getProductId()
+ {
+ return productId;
+ }
+
+ public void setProductId(int productId)
+ {
+ this.productId = productId;
+ }
+
+ public int getProductCategory()
+ {
+ return productCategory;
+ }
+
+ public void setProductCategory(int productCategory)
+ {
+ this.productCategory = productCategory;
+ }
+
+ public double getAmount()
+ {
+ return amount;
+ }
+
+ public void setAmount(double amount)
+ {
+ this.amount = amount;
+ }
+
+ public long getTimestamps()
+ {
+ return timestamps;
+ }
+
+ public void setTimestamps(long timestamp)
+ {
+ this.timestamps = timestamp;
+ }
+ }
+
+
+
+ public static class ProductEvent
+ {
+ public int productId;
+ public int productCategory;
+ public long timestamp;
+
+ public int getProductId()
+ {
+ return productId;
+ }
+
+ public void setProductId(int productId)
+ {
+ this.productId = productId;
+ }
+
+ public int getProductCategory()
+ {
+ return productCategory;
+ }
+
+ public void setProductCategory(int productCategory)
+ {
+ this.productCategory = productCategory;
+ }
+
+ public long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+ }
+
+ public int getMaxProductId()
+ {
+ return maxProductId;
+ }
+
+ public void setMaxProductId(int maxProductId)
+ {
+ this.maxProductId = maxProductId;
+ }
+
+ public int getMaxCustomerId()
+ {
+ return maxCustomerId;
+ }
+
+ public void setMaxCustomerId(int maxCustomerId)
+ {
+ this.maxCustomerId = maxCustomerId;
+ }
+
+ public int getMaxProductCategories()
+ {
+ return maxProductCategories;
+ }
+
+ public void setMaxProductCategories(int maxProductCategories)
+ {
+ this.maxProductCategories = maxProductCategories;
+ }
+
+ public double getMaxAmount()
+ {
+ return maxAmount;
+ }
+
+ public void setMaxAmount(double maxAmount)
+ {
+ this.maxAmount = maxAmount;
+ }
+
+ public boolean isSalesEvent()
+ {
+ return isSalesEvent;
+ }
+
+ public void setSalesEvent(boolean salesEvent)
+ {
+ isSalesEvent = salesEvent;
+ }
+
+ public long getMaxTuplesPerWindow()
+ {
+ return maxTuplesPerWindow;
+ }
+
+ public void setMaxTuplesPerWindow(long maxTuplesPerWindow)
+ {
+ this.maxTuplesPerWindow = maxTuplesPerWindow;
+ }
+ }
+
+ public static class ResultCollector extends BaseOperator
+ {
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object t)
+ {
+ records++;
+ }
+ };
+
+ }
+
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ POJOGenerator salesGenerator = dag.addOperator("Input1", new POJOGenerator(1,1));
+ POJOGenerator productGenerator = dag.addOperator("Input2", new POJOGenerator(1,1));
+ productGenerator.setSalesEvent(false);
+ WindowedMergeOperatorImpl<POJOGenerator.SalesEvent, POJOGenerator.ProductEvent, List<Set<Object>>, List<List<Object>>> op
+ = dag.addOperator("Merge", new WindowedMergeOperatorImpl<POJOGenerator.SalesEvent, POJOGenerator.ProductEvent, List<Set<Object>>, List<List<Object>>>());
+ op.setAccumulation(new PojoInnerJoin(2,"productId","productId"));
+ op.setDataStorage(new InMemoryWindowedStorage<List<Set<Object>>>());
+
+ WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new InMemoryWindowedStorage<>();
+ op.setWindowStateStorage(windowStateMap);
+ op.setWindowOption(new WindowOption.TimeWindows(Duration.millis(10)));
+
+ op.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1).accumulatingFiredPanes());
+ op.setAllowedLateness(Duration.millis(500));
+ ResultCollector results = dag.addOperator("results", new ResultCollector());
+
+
+ dag.addStream("SalesToJoin", salesGenerator.outputsales, op.input);
+ dag.addStream("ProductToJoin", productGenerator.outputproduct, op.input2);
+
+ dag.addStream("results", op.output, results.input);
+ dag.addStream("wm1", salesGenerator.watermarkDefaultOutputPort,op.controlInput);
+ dag.addStream("wm2", productGenerator.watermarkDefaultOutputPort,op.controlInput2);
+
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new PojoInnerJoinTestApplication(), conf);
+ LocalMode.Controller lc = lma.getController();
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return SalesCount == 1 && ProductCount == 1 && records == 2;
+ }
+ });
+ lc.run(20000);
+ Assert.assertEquals(2,records);
+ }
+}