You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:18 UTC
[13/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionBucketOperator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionBucketOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionBucketOperator.java
new file mode 100644
index 0000000..4a57207
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionBucketOperator.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.text.DecimalFormat;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A bucket-like operator to accept merchant transaction object and dissipate the
+ * transaction amount to the further downstream operator for calculating min, max and std-deviation.
+ *
+ * @since 0.9.0
+ */
+public class MerchantTransactionBucketOperator extends BaseOperator
+{
+ private static final Logger logger = LoggerFactory.getLogger(MerchantTransactionBucketOperator.class);
+ /*
+ public final transient DefaultOutputPort<KeyValPair<MerchantKey, String>> binOutputPort =
+ new DefaultOutputPort<KeyValPair<MerchantKey, String>>();
+ */
+ public final transient DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> binCountOutputPort =
+ new DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>>();
+ public final transient DefaultOutputPort<KeyValPair<MerchantKey, Long>> txOutputPort =
+ new DefaultOutputPort<KeyValPair<MerchantKey, Long>>();
+ public final transient DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>> ccAlertOutputPort =
+ new DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>>();
+ public final transient DefaultOutputPort<Map<String, Object>> summaryTxnOutputPort =
+ new DefaultOutputPort<Map<String, Object>>();
+ private MutableLong totalTxns = new MutableLong(0);
+ private MutableLong txnsInLastSecond = new MutableLong(0);
+ private MutableDouble amtInLastSecond = new MutableDouble(0);
+ private transient DecimalFormat amtFormatter = new DecimalFormat("#.##");
+ public transient DefaultInputPort<MerchantTransaction> inputPort = new DefaultInputPort<MerchantTransaction>()
+ {
+ @Override
+ public void process(MerchantTransaction tuple)
+ {
+ processTuple(tuple);
+ }
+
+ };
+ public transient DefaultInputPort<MerchantTransaction> txUserInputPort = new DefaultInputPort<MerchantTransaction>()
+ {
+ @Override
+ public void process(MerchantTransaction tuple)
+ {
+ processTuple(tuple);
+ }
+
+ };
+
+ public void endWindow()
+ {
+ Map<String, Object> summary = new HashMap<String, Object>();
+ double avg;
+ if (txnsInLastSecond.longValue() == 0) {
+ avg = 0;
+ } else {
+ avg = amtInLastSecond.doubleValue() / txnsInLastSecond.longValue();
+ }
+ summary.put("totalTxns", totalTxns);
+ summary.put("txnsInLastSecond", txnsInLastSecond);
+ summary.put("amtInLastSecond", amtFormatter.format(amtInLastSecond));
+ summary.put("avgAmtInLastSecond", amtFormatter.format(avg));
+ summaryTxnOutputPort.emit(summary);
+ txnsInLastSecond.setValue(0);
+ amtInLastSecond.setValue(0);
+ }
+
+ private void processTuple(MerchantTransaction tuple)
+ {
+ emitBankIdNumTuple(tuple, binCountOutputPort);
+ emitMerchantKeyTuple(tuple, txOutputPort);
+ emitCreditCardKeyTuple(tuple, ccAlertOutputPort);
+ totalTxns.increment();
+ txnsInLastSecond.increment();
+ amtInLastSecond.add(tuple.amount);
+ }
+
+ private void emitMerchantKeyTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, Long>> outputPort)
+ {
+ MerchantKey key = getMerchantKey(tuple);
+ KeyValPair<MerchantKey, Long> keyValPair = new KeyValPair<MerchantKey, Long>(key, tuple.amount);
+ outputPort.emit(keyValPair);
+ }
+
+ //private void emitBankIdNumTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, String>> outputPort)
+ private void emitBankIdNumTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> outputPort)
+ {
+ MerchantKey key = getMerchantKey(tuple);
+ KeyValPair<MerchantKey, String> keyValPair = new KeyValPair<MerchantKey, String>(key, tuple.bankIdNum);
+ outputPort.emit(new KeyValPair<KeyValPair<MerchantKey, String>, Integer>(keyValPair, 1));
+ }
+
+ private void emitCreditCardKeyTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>> outputPort)
+ {
+ MerchantKey key = getMerchantKey(tuple);
+
+ CreditCardData data = new CreditCardData();
+ data.fullCcNum = tuple.fullCcNum;
+ data.amount = tuple.amount;
+
+ KeyValPair<MerchantKey, CreditCardData> keyValPair = new KeyValPair<MerchantKey, CreditCardData>(key, data);
+ outputPort.emit(keyValPair);
+ }
+
+ private MerchantKey getMerchantKey(MerchantTransaction tuple)
+ {
+ MerchantKey key = new MerchantKey();
+ key.merchantId = tuple.merchantId;
+ key.terminalId = tuple.terminalId;
+ key.zipCode = tuple.zipCode;
+ key.country = tuple.country;
+ key.merchantType = tuple.merchantType;
+ key.userGenerated = tuple.userGenerated;
+ key.time = tuple.time;
+ return key;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionGenerator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionGenerator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionGenerator.java
new file mode 100644
index 0000000..2327344
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionGenerator.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.examples.frauddetect.util.JsonUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Information tuple generator with randomness.
+ *
+ * @since 0.9.0
+ */
+public class MerchantTransactionGenerator extends BaseOperator implements InputOperator
+{
+ private final Random randomNum = new Random();
+ public static final int[] zipCodes = {94086, 94087, 94088, 94089, 94090, 94091, 94092, 94093};
+ public static final String[] merchantIds = {"Wal-Mart", "Target", "Amazon", "Apple", "Sears", "Macys", "JCPenny", "Levis"};
+// public static final String bankIdNums[] = { "1111 1111 1111", "2222 2222 2222", "3333 3333 3333", "4444 4444 4444", "5555 5555 5555", "6666 6666 6666", "7777 7777 7777", "8888 8888 8888"};
+// public static final String ccNums[] = { "0001", "0002", "0003", "0004", "0005", "0006", "0007", "0008"};
+// public static final String bankIdNums[] = { "1111 1111 1111", "2222 2222 2222", "3333 3333 3333", "4444 4444 4444"};
+// public static final String ccNums[] = { "0001", "0002", "0003", "0004"};
+// public static final int zipCodes[] = { 94086, 94087, 94088, 94089, 94090};
+// public static final String merchantIds[] = { "Wal-Mart", "Target", "Amazon", "Apple"};
+// private int bankIdNumMin = 0;
+// private int bankIdNumMax = bankIdNums.length - 1;
+// private int ccMin = 0;
+// private int ccMax = ccNums.length - 1;
+ private int amountMin = 1;
+ private int amountMax = 400;
+ private int merchantIdMin = 0;
+ private int merchantIdMax = merchantIds.length - 1;
+ private int terminalIdMin = 1;
+ private int terminalIdMax = 8;
+ private int zipMin = 0;
+ private int zipMax = zipCodes.length - 1;
+ private int tupleBlastSize = 2000;
+ private boolean stopGeneration = false;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ }
+
+ public transient DefaultOutputPort<MerchantTransaction> txOutputPort =
+ new DefaultOutputPort<MerchantTransaction>();
+ public transient DefaultOutputPort<String> txDataOutputPort =
+ new DefaultOutputPort<String>();
+
+ @Override
+ public void emitTuples()
+ {
+ int count = 0;
+ List<MerchantTransaction> txList = new ArrayList();
+
+ while (!stopGeneration && count < getTupleBlastSize()) {
+
+ String bankIdNum = genBankIdNum();
+ String ccNum = genCC();
+ int merchant = genMerchantId();
+ int terminal = genTerminalId();
+ int zip = genZip();
+
+ long amount = genAmount();
+
+// int bankIdNum = 1;
+// int ccNum = 2;
+// long amount = 5000;
+// int merchant = 3;
+// int terminal = 4;
+// int zip = 0;
+
+ MerchantTransaction tx = new MerchantTransaction();
+ tx.bankIdNum = bankIdNum;
+ tx.ccNum = ccNum;
+ tx.fullCcNum = tx.bankIdNum + " " + tx.ccNum;
+ tx.amount = amount;
+ tx.merchantId = merchantIds[merchant];
+
+ // its INTERNET merchant
+ tx.merchantType = merchant == 2 || merchant == 3
+ ? MerchantTransaction.MerchantType.INTERNET
+ : MerchantTransaction.MerchantType.BRICK_AND_MORTAR;
+
+ tx.transactionType = MerchantTransaction.TransactionType.POS;
+
+ // set terminal only for a BRICK_AND_MORTAR merchant
+ if (merchant != 2 && merchant != 3) {
+ tx.terminalId = terminal;
+ }
+ tx.zipCode = zipCodes[zip];
+ tx.country = "USA";
+ tx.time = System.currentTimeMillis();
+
+ tx.userGenerated = false;
+
+ txOutputPort.emit(tx);
+
+ txList.add(tx);
+
+ count++;
+ }
+ for (MerchantTransaction txData : txList) {
+ try {
+ txDataOutputPort.emit(JsonUtils.toJson(txData));
+ } catch (IOException e) {
+ logger.warn("Exception while converting object to JSON", e);
+ }
+ }
+ txList.clear();
+
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public String genBankIdNum()
+ {
+ // Bank ID will be between 1000 0000 and 3500 0000 (25 BINs)
+ int base = randomNum.nextInt(100) + 100;
+ return base + "0 0000";
+ }
+
+ public String genCC()
+ {
+ // CC will be 1000 0000 to 1400 0000 (400,000 cards per BIN)
+ int base = (randomNum.nextInt(100000) + 10000000);
+ String baseString = Integer.toString(base);
+ return baseString.substring(0, 4) + " " + baseString.substring(4);
+ }
+
+ public int genAmount()
+ {
+ int lowRange = 50;
+ int range = amountMax - amountMin + randomNum.nextInt(lowRange);
+ return amountMin + randomNum.nextInt(range);
+ }
+
+ public int genMerchantId()
+ {
+ int range = merchantIdMax - merchantIdMin + 1;
+ return merchantIdMin + randomNum.nextInt(range);
+ }
+
+ public int genTerminalId()
+ {
+ int range = terminalIdMax - terminalIdMin + 1;
+ return terminalIdMin + randomNum.nextInt(range);
+ }
+
+ public int genZip()
+ {
+ int range = zipMax - zipMin + 1;
+ return zipMin + randomNum.nextInt(range);
+ }
+
+ public void setStopGeneration(boolean stopGeneration)
+ {
+ this.stopGeneration = stopGeneration;
+ }
+
+ /**
+ * @return the tupleBlastSize
+ */
+ public int getTupleBlastSize()
+ {
+ return tupleBlastSize;
+ }
+
+ /**
+ * @param tupleBlastSize the tupleBlastSize to set
+ */
+ public void setTupleBlastSize(int tupleBlastSize)
+ {
+ this.tupleBlastSize = tupleBlastSize;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(MerchantTransactionGenerator.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionInputHandler.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionInputHandler.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionInputHandler.java
new file mode 100644
index 0000000..6dd9c2f
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionInputHandler.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Common utility class that can be used by all other operators to handle user input
+ * captured from the Web socket input port.
+ *
+ * @since 0.9.0
+ */
+public class MerchantTransactionInputHandler extends BaseOperator
+{
+ public static final String KEY_BANK_ID_NUMBER = "bankIdNum"; // first 12 digits
+ public static final String KEY_CREDIT_CARD_NUMBER = "ccNum"; // last 4 digits
+ public static final String KEY_MERCHANT_ID = "merchantId";
+ public static final String KEY_TERMINAL_ID = "terminalId";
+ public static final String KEY_ZIP_CODE = "zipCode";
+ public static final String KEY_AMOUNT = "amount";
+ public transient DefaultOutputPort<MerchantTransaction> txOutputPort =
+ new DefaultOutputPort<MerchantTransaction>();
+ public transient DefaultInputPort<Map<String, String>> userTxInputPort = new DefaultInputPort<Map<String, String>>()
+ {
+ @Override
+ public void process(Map<String, String> tuple)
+ {
+ try {
+ txOutputPort.emit(processInput(tuple));
+ } catch (Exception exc) {
+ logger.error("Exception while handling the input", exc);
+ }
+ }
+
+ };
+
+ public MerchantTransaction processInput(Map<String, String> tuple)
+ {
+ String bankIdNum = null;
+ String ccNum = null;
+ String merchantId = null;
+ Integer terminalId = null;
+ Integer zipCode = null;
+ Long amount = null;
+ for (Map.Entry<String, String> e : tuple.entrySet()) {
+ if (e.getKey().equals(KEY_BANK_ID_NUMBER)) {
+ bankIdNum = e.getValue();
+ }
+ if (e.getKey().equals(KEY_CREDIT_CARD_NUMBER)) {
+ ccNum = e.getValue();
+ }
+ if (e.getKey().equals(KEY_MERCHANT_ID)) {
+ merchantId = e.getValue();
+ }
+ if (e.getKey().equals(KEY_TERMINAL_ID)) {
+ terminalId = new Integer(e.getValue());
+ }
+ if (e.getKey().equals(KEY_ZIP_CODE)) {
+ zipCode = new Integer(e.getValue());
+ }
+ if (e.getKey().equals(KEY_AMOUNT)) {
+ amount = new Long(e.getValue());
+ }
+ }
+
+ if (bankIdNum == null || ccNum == null || merchantId == null || terminalId == null || zipCode == null || amount == null) {
+ throw new IllegalArgumentException("Missing required input!");
+ }
+
+ MerchantTransaction tx = new MerchantTransaction();
+ tx.bankIdNum = bankIdNum;
+ tx.ccNum = ccNum;
+ tx.fullCcNum = bankIdNum + " " + ccNum;
+ tx.merchantId = merchantId;
+ tx.terminalId = terminalId;
+ tx.zipCode = zipCode;
+ tx.country = "USA";
+ tx.amount = amount;
+ tx.merchantType = tx.merchantId.equalsIgnoreCase(MerchantTransactionGenerator.merchantIds[2])
+ || tx.merchantId.equalsIgnoreCase(MerchantTransactionGenerator.merchantIds[3])
+ ? MerchantTransaction.MerchantType.INTERNET
+ : MerchantTransaction.MerchantType.BRICK_AND_MORTAR;
+ tx.transactionType = MerchantTransaction.TransactionType.POS;
+
+ tx.userGenerated = true;
+ tx.time = System.currentTimeMillis();
+
+ return tx;
+
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(MerchantTransactionInputHandler.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumKeyVal.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumKeyVal.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumKeyVal.java
new file mode 100644
index 0000000..dc2c942
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumKeyVal.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.util.ArrayList;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+
+import com.datatorrent.lib.multiwindow.AbstractSlidingWindowKeyVal;
+import com.datatorrent.lib.util.KeyValPair;
+
+
+/**
+ * Sliding window sum operator
+ *
+ * @since 0.9.0
+ */
+public class SlidingWindowSumKeyVal<K, V extends Number> extends AbstractSlidingWindowKeyVal<K, V, SlidingWindowSumObject>
+{
+
+ /**
+ * Output port to emit simple moving average (SMA) of last N window as Double.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleSum = new DefaultOutputPort<KeyValPair<K, Double>>();
+ /**
+ * Output port to emit simple moving average (SMA) of last N window as Float.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<KeyValPair<K, Float>> floatSum = new DefaultOutputPort<KeyValPair<K, Float>>();
+ /**
+ * Output port to emit simple moving average (SMA) of last N window as Long.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<KeyValPair<K, Long>> longSum = new DefaultOutputPort<KeyValPair<K, Long>>();
+ /**
+ * Output port to emit simple moving average (SMA) of last N window as
+ * Integer.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<KeyValPair<K, Integer>> integerSum = new DefaultOutputPort<KeyValPair<K, Integer>>();
+
+
+ @Override
+ public void processDataTuple(KeyValPair<K, V> tuple)
+ {
+ K key = tuple.getKey();
+ ArrayList<SlidingWindowSumObject> stateList = buffer.get(key);
+ if (stateList == null) {
+ stateList = new ArrayList<SlidingWindowSumObject>();
+ for (int i = 0; i < windowSize; ++i) {
+ stateList.add(new SlidingWindowSumObject());
+ }
+ buffer.put(key, stateList);
+ }
+ SlidingWindowSumObject state = stateList.get(currentstate);
+ state.add(tuple.getValue());
+ }
+
+ @Override
+ public void emitTuple(K key, ArrayList<SlidingWindowSumObject> obj)
+ {
+ double sum = 0;
+ for (int i = 0; i < obj.size(); ++i) {
+ SlidingWindowSumObject state = obj.get(i);
+ sum += state.getSum();
+ }
+ if (doubleSum.isConnected()) {
+ doubleSum.emit(new KeyValPair<K, Double>(key, sum));
+ }
+ if (floatSum.isConnected()) {
+ floatSum.emit(new KeyValPair<K, Float>(key, (float)sum));
+ }
+ if (longSum.isConnected()) {
+ longSum.emit(new KeyValPair<K, Long>(key, (long)sum));
+ }
+ if (integerSum.isConnected()) {
+ integerSum.emit(new KeyValPair<K, Integer>(key, (int)sum));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumObject.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumObject.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumObject.java
new file mode 100644
index 0000000..fc5f95d
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumObject.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+
+import org.apache.commons.lang.mutable.MutableDouble;
+
+import com.datatorrent.lib.multiwindow.SimpleMovingAverageObject;
+
+/**
+ * State object for sliding window sum
+ *
+ * @since 0.9.0
+ */
+public class SlidingWindowSumObject extends SimpleMovingAverageObject
+{
+
+ MutableDouble sum = new MutableDouble(0);
+
+ public void add(Number n)
+ {
+ sum.add(n);
+ }
+
+ @Override
+ public double getSum()
+ {
+ return sum.doubleValue();
+ }
+
+ @Override
+ public void clear()
+ {
+ sum.setValue(0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsAggregator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsAggregator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsAggregator.java
new file mode 100644
index 0000000..71f3035
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsAggregator.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.examples.frauddetect.util.JsonUtils;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.HighLow;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Operator to aggregate the min, max, sma, std-dev and variance for the given key.
+ *
+ * @since 0.9.0
+ */
+public class TransactionStatsAggregator extends BaseOperator
+{
+ public Map<MerchantKey, TransactionStatsData> aggrgateMap =
+ new HashMap<MerchantKey, TransactionStatsData>();
+ public final transient DefaultOutputPort<String> txDataOutputPort = new DefaultOutputPort<String>();
+ public final transient DefaultInputPort<KeyValPair<MerchantKey, HighLow<Long>>> rangeInputPort =
+ new DefaultInputPort<KeyValPair<MerchantKey, HighLow<Long>>>()
+ {
+ @Override
+ public void process(KeyValPair<MerchantKey, HighLow<Long>> tuple)
+ {
+ TransactionStatsData data = getDataObjectFromMap(tuple.getKey());
+ // HighLow is not currently typed, casting till it is fixed
+ data.min = tuple.getValue().getLow();
+ data.max = tuple.getValue().getHigh();
+ }
+
+ };
+ public final transient DefaultInputPort<KeyValPair<MerchantKey, Long>> smaInputPort =
+ new DefaultInputPort<KeyValPair<MerchantKey, Long>>()
+ {
+ @Override
+ public void process(KeyValPair<MerchantKey, Long> tuple)
+ {
+ TransactionStatsData data = getDataObjectFromMap(tuple.getKey());
+ data.sma = tuple.getValue();
+ }
+
+ };
+
+ private TransactionStatsData getDataObjectFromMap(MerchantKey key)
+ {
+ TransactionStatsData data = aggrgateMap.get(key);
+ if (data == null) {
+ data = new TransactionStatsData();
+ data.time = System.currentTimeMillis();
+ data.merchantId = key.merchantId;
+ data.terminalId = key.terminalId == null ? 0 : key.terminalId;
+ data.zipCode = key.zipCode;
+ data.merchantType = key.merchantType;
+ aggrgateMap.put(key, data);
+ }
+ return data;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ for (Map.Entry<MerchantKey, TransactionStatsData> entry : aggrgateMap.entrySet()) {
+ try {
+ txDataOutputPort.emit(JsonUtils.toJson(entry.getValue()));
+ } catch (IOException e) {
+ logger.warn("Exception while converting object to JSON", e);
+ }
+ }
+ aggrgateMap.clear();
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(TransactionStatsAggregator.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsData.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsData.java
new file mode 100644
index 0000000..f0b2b86
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsData.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+/**
+ * POJO to capture transaction data related to min, max, sma, std-dev, variance.
+ *
+ * @since 0.9.0
+ */
+public class TransactionStatsData
+{
+ public String merchantId;
+ public int terminalId;
+ public int zipCode;
+ public MerchantTransaction.MerchantType merchantType;
+ public long min;
+ public long max;
+ public double sma;
+ public long time;
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/HdfsStringOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/HdfsStringOutputOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/HdfsStringOutputOperator.java
new file mode 100644
index 0000000..89c4bcd
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/HdfsStringOutputOperator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect.operator;
+
+import java.io.File;
+
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
+/**
+ * Adapter for writing Strings to HDFS
+ * <p>
+ * Serializes tuples into a HDFS file.<br/>
+ * </p>
+ *
+ * @since 0.9.4
+ */
+public class HdfsStringOutputOperator extends AbstractFileOutputOperator<String>
+{
+ private transient String outputFileName;
+ private transient String contextId;
+ private int index = 0;
+
+ public HdfsStringOutputOperator()
+ {
+ setMaxLength(1024 * 1024);
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ contextId = context.getValue(DAGContext.APPLICATION_NAME);
+ outputFileName = File.separator + contextId +
+ File.separator + "transactions.out.part";
+ super.setup(context);
+ }
+
+ @Override
+ public byte[] getBytesForTuple(String t)
+ {
+ return t.getBytes();
+ }
+
+ @Override
+ protected String getFileName(String tuple)
+ {
+ return outputFileName;
+ }
+
+ @Override
+ public String getPartFileName(String fileName, int part)
+ {
+ return fileName + part;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/MongoDBOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/MongoDBOutputOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/MongoDBOutputOperator.java
new file mode 100644
index 0000000..e059c03
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/MongoDBOutputOperator.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect.operator;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.WriteConcern;
+import com.mongodb.WriteResult;
+import com.mongodb.util.JSON;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+
+/**
+ * Operator to write data into MongoDB
+ *
+ * @since 0.9.0
+ */
+public class MongoDBOutputOperator extends BaseOperator
+{
+ @NotNull
+ protected String hostName;
+ @NotNull
+ protected String dataBase;
+ @NotNull
+ protected String collection;
+
+ protected WriteConcern writeConcern = WriteConcern.ACKNOWLEDGED;
+
+ protected String userName;
+ protected String passWord;
+
+ protected transient MongoClient mongoClient;
+ protected transient DB db;
+ protected transient DBCollection dbCollection;
+
+ protected List<DBObject> dataList = new ArrayList<DBObject>();
+
+ public MongoDBOutputOperator()
+ {
+ }
+
+ /**
+ * Take the JSON formatted string and convert it to DBObject
+ */
+ public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>()
+ {
+ @Override
+ public void process(String tuple)
+ {
+ dataList.add((DBObject)JSON.parse(tuple));
+ }
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ try {
+ mongoClient = new MongoClient(hostName);
+ db = mongoClient.getDB(dataBase);
+ if (userName != null && passWord != null) {
+ if (!db.authenticate(userName, passWord.toCharArray())) {
+ throw new IllegalArgumentException("MongoDB authentication failed. Illegal username and password for MongoDB!!");
+ }
+ }
+ dbCollection = db.getCollection(collection);
+ } catch (UnknownHostException ex) {
+ logger.debug(ex.toString());
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ // nothing
+ }
+
+ @Override
+ public void endWindow()
+ {
+ logger.debug("mongo datalist size: " + dataList.size());
+ if (dataList.size() > 0) {
+ WriteResult result = dbCollection.insert(dataList, writeConcern);
+ logger.debug("Result for MongoDB insert: " + result);
+ dataList.clear();
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ if (mongoClient != null) {
+ mongoClient.close();
+ }
+ }
+
+ public String getHostName()
+ {
+ return hostName;
+ }
+
+ public void setHostName(String hostName)
+ {
+ this.hostName = hostName;
+ }
+
+ public String getDataBase()
+ {
+ return dataBase;
+ }
+
+ public void setDataBase(String dataBase)
+ {
+ this.dataBase = dataBase;
+ }
+
+ public String getCollection()
+ {
+ return collection;
+ }
+
+ public void setCollection(String collection)
+ {
+ this.collection = collection;
+ }
+
+ public String getUserName()
+ {
+ return userName;
+ }
+
+ public void setUserName(String userName)
+ {
+ this.userName = userName;
+ }
+
+ public String getPassWord()
+ {
+ return passWord;
+ }
+
+ public void setPassWord(String passWord)
+ {
+ this.passWord = passWord;
+ }
+
+ public WriteConcern getWriteConcern()
+ {
+ return writeConcern;
+ }
+
+ public void setWriteConcern(WriteConcern writeConcern)
+ {
+ this.writeConcern = writeConcern;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(MongoDBOutputOperator.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/util/JsonUtils.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/util/JsonUtils.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/util/JsonUtils.java
new file mode 100644
index 0000000..932ef64
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/util/JsonUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect.util;
+
+import java.io.IOException;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Utility class to deal with JSON and Object
+ *
+ * @since 0.9.0
+ */
+public class JsonUtils
+{
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ public static String toJson(Object obj) throws IOException
+ {
+ return mapper.writeValueAsString(obj);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/resources/META-INF/properties.xml b/examples/frauddetect/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..7a42ac4
--- /dev/null
+++ b/examples/frauddetect/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,167 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+<property>
+ <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
+ <value>1000</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.ccUserAlertQueryOutput.topic</name>
+ <value>examples.app.frauddetect.fraudAlert</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.binUserAlertOutput.topic</name>
+ <value>examples.app.frauddetect.fraudAlert</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.avgUserAlertQueryOutput.topic</name>
+ <value>examples.app.frauddetect.fraudAlert</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.txSummaryWsOutput.topic</name>
+ <value>examples.app.frauddetect.txSummary</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.bankInfoFraudDetector.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>10</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.bankInfoFraudDetector.threshold
+ </name>
+ <value>20</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.rangePerMerchant.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>1</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.txReceiver.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>1</value>
+</property>
+<!-- property>
+ <name>dt.application.frauddetect.operator.smaPerMerchant.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>1</value>
+</property-->
+<property>
+ <name>dt.application.FraudDetectExample.operator.smaPerMerchant.windowSize
+ </name>
+ <value>30</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.movingSum.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>10</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.movingSum.windowSize
+ </name>
+ <value>3</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.avgAlerter.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>1</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.avgAlerter.threshold
+ </name>
+ <value>1200</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.amountFraudDetector.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>1</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.amountFraudDetector.threshold
+ </name>
+ <value>420</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.hostName</name>
+ <value>localhost</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.dataBase</name>
+ <value>frauddetect</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.collection</name>
+ <value>txStats</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.hostName</name>
+ <value>localhost</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.dataBase</name>
+ <value>frauddetect</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.collection</name>
+ <value>binAlerts</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.hostName</name>
+ <value>localhost</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.dataBase</name>
+ <value>frauddetect</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.collection</name>
+ <value>ccAlerts</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.hostName</name>
+ <value>localhost</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.dataBase</name>
+ <value>frauddetect</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.collection</name>
+ <value>avgAlerts</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.txStatsAggregator.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>1</value>
+</property>
+
+<property>
+ <name>dt.application.FraudDetectExample.port.*.attr.QUEUE_CAPACITY</name>
+ <value>32000</value>
+ </property>
+ <property>
+ <name>dt.application.FraudDetectExample.operator.*.attr.MEMORY_MB</name>
+ <value>2048</value>
+ </property>
+
+</configuration>
+
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/test/java/org/apache/apex/examples/frauddetect/FrauddetectApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/test/java/org/apache/apex/examples/frauddetect/FrauddetectApplicationTest.java b/examples/frauddetect/src/test/java/org/apache/apex/examples/frauddetect/FrauddetectApplicationTest.java
new file mode 100644
index 0000000..ffb1cf2
--- /dev/null
+++ b/examples/frauddetect/src/test/java/org/apache/apex/examples/frauddetect/FrauddetectApplicationTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Fraud detection application test
+ */
+public class FrauddetectApplicationTest
+{
+
+ public FrauddetectApplicationTest()
+ {
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ try {
+ Application application = new Application();
+ Configuration conf = new Configuration(false);
+ conf.addResource("dt-site-frauddetect.xml");
+ LocalMode lma = LocalMode.newInstance();
+ lma.prepareDAG(application, conf);
+ lma.getController().run(120000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/test/resources/dt-site-frauddetect.xml
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/test/resources/dt-site-frauddetect.xml b/examples/frauddetect/src/test/resources/dt-site-frauddetect.xml
new file mode 100644
index 0000000..7a404c4
--- /dev/null
+++ b/examples/frauddetect/src/test/resources/dt-site-frauddetect.xml
@@ -0,0 +1,173 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <property>
+ <name>dt.application.FraudDetectExample.class</name>
+ <value>com.datatorrent.examples.frauddetect.Application</value>
+ <description>An alias for the application</description>
+</property>
+<property>
+ <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
+ <value>1000</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.ccUserAlertQueryOutput.topic</name>
+ <value>examples.app.frauddetect.fraudAlert</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.binUserAlertOutput.topic</name>
+ <value>examples.app.frauddetect.fraudAlert</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.avgUserAlertQueryOutput.topic</name>
+ <value>examples.app.frauddetect.fraudAlert</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.txSummaryWsOutput.topic</name>
+ <value>examples.app.frauddetect.txSummary</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.bankInfoFraudDetector.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>10</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.bankInfoFraudDetector.threshold
+ </name>
+ <value>20</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.rangePerMerchant.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>1</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.txReceiver.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>1</value>
+</property>
+<!-- property>
+ <name>dt.application.frauddetect.operator.smaPerMerchant.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>1</value>
+</property-->
+<property>
+ <name>dt.application.FraudDetectExample.operator.smaPerMerchant.windowSize
+ </name>
+ <value>30</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.movingSum.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>10</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.movingSum.windowSize
+ </name>
+ <value>3</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.avgAlerter.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>1</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.avgAlerter.threshold
+ </name>
+ <value>1200</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.amountFraudDetector.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>1</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.amountFraudDetector.threshold
+ </name>
+ <value>420</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.hostName</name>
+ <value>localhost</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.dataBase</name>
+ <value>frauddetect</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.collection</name>
+ <value>txStats</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.hostName</name>
+ <value>localhost</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.dataBase</name>
+ <value>frauddetect</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.collection</name>
+ <value>binAlerts</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.hostName</name>
+ <value>localhost</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.dataBase</name>
+ <value>frauddetect</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.collection</name>
+ <value>ccAlerts</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.hostName</name>
+ <value>localhost</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.dataBase</name>
+ <value>frauddetect</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.collection</name>
+ <value>avgAlerts</value>
+</property>
+<property>
+ <name>dt.application.FraudDetectExample.operator.txStatsAggregator.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>1</value>
+</property>
+
+<property>
+ <name>dt.application.FraudDetectExample.port.*.attr.QUEUE_CAPACITY</name>
+ <value>32000</value>
+ </property>
+<property>
+ <name>dt.application.FraudDetectExampleq.operator.*.attr.MEMORY_MB</name>
+ <value>2048</value>
+</property>
+
+
+</configuration>
+
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/test/resources/log4j.properties b/examples/frauddetect/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/frauddetect/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/pom.xml
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/pom.xml b/examples/highlevelapi/pom.xml
new file mode 100644
index 0000000..da843b7
--- /dev/null
+++ b/examples/highlevelapi/pom.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>malhar-examples-highlevelapi</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar High-Level API Example</name>
+ <description>Apex exmaple applications that use High-level API to construct a dag</description>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>target/${project.artifactId}-${project.version}.apa</file>
+ <type>apa</type>
+ </artifact>
+ </artifacts>
+ <skipAttach>false</skipAttach>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.10</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <!-- required by twitter example -->
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-core</artifactId>
+ <version>4.0.4</version>
+ </dependency>
+ <dependency>
+ <!-- required by twitter example -->
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
+ <version>4.0.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-contrib</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-stream</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <version>2.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <version>1.4.192</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.9.1</version>
+ </dependency>
+ <dependency>
+ <!--This dependency is needed for StreamingWordExtractTest-->
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>commons-compiler</artifactId>
+ <version>2.7.8</version>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <!--This dependency is needed for StreamingWordExtractTest-->
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>2.7.8</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+
+</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/assemble/appPackage.xml b/examples/highlevelapi/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/highlevelapi/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
new file mode 100644
index 0000000..327c882
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam MinimalWordCount Example
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "MinimalWordCount")
+public class MinimalWordCount implements StreamingApplication
+{
+ public static class Collector extends BaseOperator
+ {
+ static Map<String, Long> result;
+ private static boolean done = false;
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ done = false;
+ result = new HashMap<>();
+ }
+
+ public final transient DefaultInputPort<KeyValPair<String, Long>> input = new DefaultInputPort<KeyValPair<String, Long>>()
+ {
+ @Override
+ public void process(KeyValPair<String, Long> tuple)
+ {
+ if (tuple.getKey().equals("bye")) {
+ done = true;
+ }
+ result.put(tuple.getKey(), tuple.getValue());
+ }
+ };
+ }
+
+ /**
+ * Populate the dag using High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Collector collector = new Collector();
+ // Create a stream reading from a file line by line using StreamFactory.
+ StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
+ // Use a flatmap transformation to extract words from the incoming stream of lines.
+ .flatMap(new Function.FlatMapFunction<String, String>()
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ return Arrays.asList(input.split("[^a-zA-Z']+"));
+
+ }
+ }, name("ExtractWords"))
+ // Apply windowing to the stream for counting, in this case, the window option is global window.
+ .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+ // Count the appearances of every word.
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple<KeyValPair<String, Long>>(new KeyValPair<String, Long>(input, 1L));
+ }
+ }, name("countByKey"))
+ // Format the counting result to a readable format by unwrapping the tuples.
+ .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, KeyValPair<String, Long>>()
+ {
+ @Override
+ public KeyValPair<String, Long> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ return input.getValue();
+ }
+ }, name("FormatResults"))
+ // Print the result.
+ .print(name("console"))
+ // Attach a collector to the stream to collect results.
+ .endWith(collector, collector.input, name("Collector"))
+ // populate the dag using the stream.
+ .populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
new file mode 100644
index 0000000..5b83bd0
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.Duration;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam WindowedWordCount Example.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "WindowedWordCount")
+public class WindowedWordCount implements StreamingApplication
+{
+ static final int WINDOW_SIZE = 1; // Default window duration in minutes
+
+ /**
+ * A input operator that reads from and output a file line by line to downstream with a time gap between
+ * every two lines.
+ */
+ public static class TextInput extends BaseOperator implements InputOperator
+ {
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+ private boolean done = false;
+
+ private transient BufferedReader reader;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ done = false;
+ initReader();
+ }
+
+ private void initReader()
+ {
+ try {
+ InputStream resourceStream = this.getClass().getResourceAsStream("/wordcount/word.txt");
+ reader = new BufferedReader(new InputStreamReader(resourceStream));
+ } catch (Exception ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ IOUtils.closeQuietly(reader);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (!done) {
+ try {
+ String line = reader.readLine();
+ if (line == null) {
+ done = true;
+ reader.close();
+ } else {
+ this.output.emit(line);
+ }
+ Thread.sleep(50);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>();
+ private static boolean done = false;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ done = false;
+ }
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ public static Map<KeyValPair<Long, String>, Long> getResult()
+ {
+ return result;
+ }
+
+ public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>()
+ {
+ @Override
+ public void process(PojoEvent tuple)
+ {
+ result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()), tuple.getCount());
+ if (tuple.getWord().equals("bye")) {
+ done = true;
+ }
+ }
+ };
+ }
+
+ /**
+ * A Pojo Tuple class used for outputting result to JDBC.
+ */
+ public static class PojoEvent
+ {
+ private String word;
+ private long count;
+ private long timestamp;
+
+ @Override
+ public String toString()
+ {
+ return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")";
+ }
+
+ public String getWord()
+ {
+ return word;
+ }
+
+ public void setWord(String word)
+ {
+ this.word = word;
+ }
+
+ public long getCount()
+ {
+ return count;
+ }
+
+ public void setCount(long count)
+ {
+ this.count = count;
+ }
+
+ public long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+ }
+
+ /**
+ * A map function that wrap the input string with a random generated timestamp.
+ */
+ public static class AddTimestampFn implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
+ {
+ private static final Duration RAND_RANGE = Duration.standardMinutes(10);
+ private final Long minTimestamp;
+
+ AddTimestampFn()
+ {
+ this.minTimestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public Tuple.TimestampedTuple<String> f(String input)
+ {
+ // Generate a timestamp that falls somewhere in the past two hours.
+ long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
+ long randomTimestamp = minTimestamp + randMillis;
+
+ return new Tuple.TimestampedTuple<>(randomTimestamp, input);
+ }
+ }
+
+ /** A MapFunction that converts a Word and Count into a PojoEvent. */
+ public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent>
+ {
+ @Override
+ public PojoEvent f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ PojoEvent row = new PojoEvent();
+ row.setTimestamp(input.getTimestamp());
+ row.setCount(input.getValue().getValue());
+ row.setWord(input.getValue().getKey());
+ return row;
+ }
+ }
+
+ /**
+ * Populate dag with High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TextInput input = new TextInput();
+ Collector collector = new Collector();
+
+ // Create stream from the TextInput operator.
+ ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input"))
+
+ // Extract all the words from the input line of text.
+ .flatMap(new Function.FlatMapFunction<String, String>()
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
+ }
+ }, name("ExtractWords"))
+
+ // Wrap the word with a randomly generated timestamp.
+ .map(new AddTimestampFn(), name("AddTimestampFn"));
+
+
+ // apply window and trigger option.
+ // TODO: change trigger option to atWaterMark when available.
+ WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream
+ .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)),
+ new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1));
+
+
+ WindowedStream<PojoEvent> wordCounts =
+ // Perform a countByKey transformation to count the appearance of each word in every time window.
+ windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
+ {
+ return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(),
+ new KeyValPair<String, Long>(input.getValue(), 1L));
+ }
+ }, name("count words"))
+
+ // Format the output and print out the result.
+ .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(name("console"));
+
+ wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag);
+ }
+}