You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:19 UTC

[14/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageReceiver.java
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageReceiver.java b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageReceiver.java
new file mode 100644
index 0000000..34a2c56
--- /dev/null
+++ b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageReceiver.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.echoserver;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+
+/**
+ * @since 2.1.0
+ */
+public class MessageReceiver implements InputOperator, NetworkManager.ChannelListener<DatagramChannel>
+{
+  private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
+
+  private transient NetworkManager.ChannelAction<DatagramChannel> action;
+
+  //Need the sender info, using a packet for now instead of the buffer
+  private transient ByteBuffer buffer;
+  //private transient DatagramPacket packet;
+
+  private int port = 9000;
+  private int maxMesgSize = 512;
+  private int inactiveWait = 10;
+  private boolean readReady = false;
+
+  @Override
+  public void emitTuples()
+  {
+    boolean emitData = false;
+    if (readReady) {
+      //DatagramSocket socket = action.channelConfiguration.socket;
+      try {
+        //socket.receive(packet);
+        DatagramChannel channel = action.channelConfiguration.channel;
+        SocketAddress address = channel.receive(buffer);
+        if (address != null) {
+          /*
+          StringBuilder sb = new StringBuilder();
+          buffer.rewind();
+          while (buffer.hasRemaining()) {
+            sb.append(buffer.getChar());
+          }
+          String mesg = sb.toString();
+          */
+          buffer.flip();
+          String mesg = new String(buffer.array(), 0, buffer.limit());
+          logger.info("Message {}", mesg);
+          Message message = new Message();
+          message.message = mesg;
+          message.socketAddress = address;
+          messageOutput.emit(message);
+          emitData = true;
+          buffer.clear();
+        }
+        //String mesg = new String(packet.getData(), packet.getOffset(), packet.getLength());
+      } catch (IOException e) {
+        throw new RuntimeException("Error reading from channel", e);
+      }
+      // Even if we miss a readReady because of not locking we will get it again immediately
+      readReady = false;
+    }
+    if (!emitData) {
+      synchronized (buffer) {
+        try {
+          if (!readReady) {
+            buffer.wait(inactiveWait);
+          }
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void beginWindow(long l)
+  {
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+
+  }
+
+  public final transient DefaultOutputPort<Message> messageOutput = new DefaultOutputPort<Message>();
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    try {
+      //byte[] mesgData = new byte[maxMesgSize];
+      //packet = new DatagramPacket(mesgData, maxMesgSize);
+      buffer = ByteBuffer.allocate(maxMesgSize);
+      action = NetworkManager.getInstance().registerAction(port, NetworkManager.ConnectionType.UDP, this, SelectionKey.OP_READ);
+    } catch (IOException e) {
+      throw new RuntimeException("Error initializing receiver", e);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    try {
+      NetworkManager.getInstance().unregisterAction(action);
+    } catch (Exception e) {
+      throw new RuntimeException("Error shutting down receiver", e);
+    }
+  }
+
+  @Override
+  public void ready(NetworkManager.ChannelAction<DatagramChannel> action, int readyOps)
+  {
+    synchronized (buffer) {
+      readReady = true;
+      buffer.notify();
+    }
+  }
+
+  public int getPort()
+  {
+    return port;
+  }
+
+  public void setPort(int port)
+  {
+    this.port = port;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageResponder.java
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageResponder.java b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageResponder.java
new file mode 100644
index 0000000..d34bf3e
--- /dev/null
+++ b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageResponder.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.echoserver;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * @since 2.1.0
+ */
+public class MessageResponder extends BaseOperator
+{
+
+  private String responseHeader = "Response: ";
+
+  private int port = 9000;
+  private int maxMesgSize = 512;
+  private transient NetworkManager.ChannelAction<DatagramChannel> action;
+  private transient ByteBuffer buffer;
+
+  public final transient DefaultInputPort<Message> messageInput = new DefaultInputPort<Message>()
+  {
+    @Override
+    public void process(Message message)
+    {
+      String sendMesg = responseHeader + message.message;
+      SocketAddress address = message.socketAddress;
+      buffer.put(sendMesg.getBytes());
+      buffer.flip();
+      try {
+        action.channelConfiguration.channel.send(buffer, address);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      buffer.clear();
+    }
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    try {
+      buffer = ByteBuffer.allocate(maxMesgSize);
+      action = NetworkManager.getInstance().registerAction(port, NetworkManager.ConnectionType.UDP, null, 0);
+    } catch (IOException e) {
+      throw new RuntimeException("Error initializer responder", e);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    try {
+      NetworkManager.getInstance().unregisterAction(action);
+    } catch (Exception e) {
+      throw new RuntimeException("Error shutting down responder", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/NetworkManager.java
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/NetworkManager.java b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/NetworkManager.java
new file mode 100644
index 0000000..88cf621
--- /dev/null
+++ b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/NetworkManager.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.echoserver;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since 2.1.0
+ */
+public class NetworkManager implements Runnable
+{
+  private static final Logger logger = LoggerFactory.getLogger(NetworkManager.class);
+
+  public static enum ConnectionType
+  {
+    TCP,
+    UDP
+  }
+
+  private static NetworkManager _instance;
+  private Selector selector;
+
+  private volatile boolean doRun = false;
+  private Thread selThread;
+  private long selTimeout = 1000;
+  private volatile Exception selEx;
+
+  private Map<ConnectionInfo, ChannelConfiguration> channels;
+  private Map<SelectableChannel, ChannelConfiguration> channelConfigurations;
+
+  public static NetworkManager getInstance() throws IOException
+  {
+    if (_instance == null) {
+      synchronized (NetworkManager.class) {
+        if (_instance == null) {
+          _instance = new NetworkManager();
+        }
+      }
+    }
+    return _instance;
+  }
+
+  private NetworkManager() throws IOException
+  {
+    channels = new HashMap<ConnectionInfo, ChannelConfiguration>();
+    channelConfigurations = new HashMap<SelectableChannel, ChannelConfiguration>();
+  }
+
+  public synchronized <T extends SelectableChannel> ChannelAction<T> registerAction(int port, ConnectionType type, ChannelListener<T> listener, int ops) throws IOException
+  {
+    boolean startProc = (channels.size() == 0);
+    SelectableChannel channel = null;
+    SocketAddress address = new InetSocketAddress(port);
+    ConnectionInfo connectionInfo = new ConnectionInfo();
+    connectionInfo.address =  address;
+    connectionInfo.connectionType = type;
+    ChannelConfiguration channelConfiguration = channels.get(connectionInfo);
+    if (channelConfiguration == null) {
+      Object socket = null;
+      if (type == ConnectionType.TCP) {
+        SocketChannel schannel = SocketChannel.open();
+        schannel.configureBlocking(false);
+        Socket ssocket = schannel.socket();
+        ssocket.bind(address);
+        socket = ssocket;
+        channel = schannel;
+      } else if (type == ConnectionType.UDP) {
+        DatagramChannel dchannel = DatagramChannel.open();
+        dchannel.configureBlocking(false);
+        DatagramSocket dsocket = dchannel.socket();
+        dsocket.bind(address);
+        socket = dsocket;
+        channel = dchannel;
+      }
+      if (channel == null) {
+        throw new IOException("Unsupported connection type");
+      }
+      channelConfiguration = new ChannelConfiguration();
+      channelConfiguration.actions = new ConcurrentLinkedQueue<ChannelAction>();
+      channelConfiguration.channel = channel;
+      channelConfiguration.connectionInfo = connectionInfo;
+      channels.put(connectionInfo, channelConfiguration);
+      channelConfigurations.put(channel, channelConfiguration);
+    } else {
+      channel = channelConfiguration.channel;
+    }
+    ChannelAction channelAction = new ChannelAction();
+    channelAction.channelConfiguration = channelConfiguration;
+    channelAction.listener = listener;
+    channelAction.ops = ops;
+    channelConfiguration.actions.add(channelAction);
+    if (startProc) {
+      startProcess();
+    }
+    if (listener != null) {
+      channel.register(selector, ops);
+    }
+    return channelAction;
+  }
+
+  public synchronized void unregisterAction(ChannelAction action) throws IOException, InterruptedException
+  {
+    ChannelConfiguration channelConfiguration = action.channelConfiguration;
+    SelectableChannel channel = channelConfiguration.channel;
+    if (channelConfiguration != null) {
+      channelConfiguration.actions.remove(action);
+      if (channelConfiguration.actions.size() == 0) {
+        ConnectionInfo connectionInfo = channelConfiguration.connectionInfo;
+        channelConfigurations.remove(channel);
+        channels.remove(connectionInfo);
+        channel.close();
+      }
+    }
+    if (channels.size() == 0) {
+      stopProcess();
+    }
+  }
+
+  private void startProcess() throws IOException
+  {
+    selector = Selector.open();
+    doRun = true;
+    selThread = new Thread(this);
+    selThread.start();
+  }
+
+  private void stopProcess() throws InterruptedException, IOException
+  {
+    doRun = false;
+    selThread.join();
+    selector.close();
+  }
+
+  @Override
+  public void run()
+  {
+    try {
+      while (doRun) {
+        int keys = selector.select(selTimeout);
+        if (keys > 0) {
+          Set<SelectionKey> selectionKeys = selector.selectedKeys();
+          for (SelectionKey selectionKey : selectionKeys) {
+            int readyOps = selectionKey.readyOps();
+            ChannelConfiguration channelConfiguration = channelConfigurations.get(selectionKey.channel());
+            Collection<ChannelAction> actions = channelConfiguration.actions;
+            for (ChannelAction action : actions) {
+              if (((readyOps & action.ops) != 0) && (action.listener != null)) {
+                action.listener.ready(action, readyOps);
+              }
+            }
+          }
+          selectionKeys.clear();
+        }
+      }
+    } catch (IOException e) {
+      logger.error("Error in select", e);
+      selEx = e;
+    }
+  }
+
+  public static interface ChannelListener<T extends SelectableChannel>
+  {
+    public void ready(ChannelAction<T> action, int readyOps);
+  }
+
+  public static class ChannelConfiguration<T extends SelectableChannel>
+  {
+    public T channel;
+    public ConnectionInfo connectionInfo;
+    public Collection<ChannelAction> actions;
+  }
+
+  public static class ChannelAction<T extends SelectableChannel>
+  {
+    public ChannelConfiguration<T> channelConfiguration;
+    public ChannelListener<T> listener;
+    public int ops;
+  }
+
+  private static class ConnectionInfo
+  {
+    public SocketAddress address;
+    public ConnectionType connectionType;
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      ConnectionInfo that = (ConnectionInfo)o;
+
+      if (connectionType != that.connectionType) {
+        return false;
+      }
+      if (!address.equals(that.address)) {
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      int result = address.hashCode();
+      result = 31 * result + connectionType.hashCode();
+      return result;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/main/resources/META-INF/properties.xml b/examples/echoserver/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..0822f4f
--- /dev/null
+++ b/examples/echoserver/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- 
+  <property>
+    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+    <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+  </property>
+  -->
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>1024</value>
+  </property>
+  <property>
+    <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
+    <value>hello world: %s</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/test/java/org/apache/apex/examples/echoserver/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/test/java/org/apache/apex/examples/echoserver/ApplicationTest.java b/examples/echoserver/src/test/java/org/apache/apex/examples/echoserver/ApplicationTest.java
new file mode 100644
index 0000000..c4a3ad4
--- /dev/null
+++ b/examples/echoserver/src/test/java/org/apache/apex/examples/echoserver/ApplicationTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.echoserver;
+
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest
+{
+
+  @Test
+  public void testApplication() throws IOException, Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+      lma.prepareDAG(new Application(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.run(10000); // runs for 10 seconds and quits
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/test/resources/log4j.properties b/examples/echoserver/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/echoserver/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/pom.xml
----------------------------------------------------------------------
diff --git a/examples/frauddetect/pom.xml b/examples/frauddetect/pom.xml
new file mode 100644
index 0000000..36d4153
--- /dev/null
+++ b/examples/frauddetect/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  
+  <artifactId>malhar-examples-frauddetect</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar Fraud Detect Example</name>
+  <description>Apex example application that demonstrates real-time pattern detection in the incoming data and alerting. The example processes streaming credit card transactions and looks for fraudulent transactions.</description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <properties>
+    <skipTests>true</skipTests>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.mongodb</groupId>
+      <artifactId>mongo-java-driver</artifactId>
+      <version>2.10.1</version>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/assemble/appPackage.xml b/examples/frauddetect/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/frauddetect/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
new file mode 100644
index 0000000..73c38ef
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.Serializable;
+import java.net.URI;
+import org.apache.apex.examples.frauddetect.operator.HdfsStringOutputOperator;
+import org.apache.apex.examples.frauddetect.operator.MongoDBOutputOperator;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
+import com.datatorrent.lib.math.RangeKeyVal;
+import com.datatorrent.lib.multiwindow.SimpleMovingAverage;
+import com.datatorrent.lib.util.BaseKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.netlet.util.DTThrowable;
+
+
+/**
+ * Fraud detection application
+ *
+ * @since 0.9.0
+ */
+@ApplicationAnnotation(name = "FraudDetectExample")
+public class Application implements StreamingApplication
+{
+
+  public PubSubWebSocketInputOperator getPubSubWebSocketInputOperator(String name, DAG dag, URI duri, String topic) throws Exception
+  {
+    PubSubWebSocketInputOperator reqin = dag.addOperator(name, new PubSubWebSocketInputOperator());
+    reqin.setUri(duri);
+    reqin.setTopic(topic);
+    return reqin;
+  }
+
+  public PubSubWebSocketOutputOperator getPubSubWebSocketOutputOperator(String name, DAG dag, URI duri, String topic) throws Exception
+  {
+    PubSubWebSocketOutputOperator out = dag.addOperator(name, new PubSubWebSocketOutputOperator());
+    out.setUri(duri);
+    return out;
+  }
+
+  public HdfsStringOutputOperator getHdfsOutputOperator(String name, DAG dag, String folderName)
+  {
+    HdfsStringOutputOperator oper = dag.addOperator("hdfs", HdfsStringOutputOperator.class);
+    oper.setFilePath(folderName);
+    oper.setMaxLength(1024 * 1024 * 1024);
+    return oper;
+  }
+
+  public ConsoleOutputOperator getConsoleOperator(String name, DAG dag, String prefix, String format)
+  {
+    ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class);
+    // oper.setStringFormat(prefix + ": " + format);
+    return oper;
+  }
+
+  public static class KeyPartitionCodec<K, V> extends BaseKeyValueOperator.DefaultPartitionCodec<K,V> implements Serializable
+  {
+    private static final long serialVersionUID = 201410031623L;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+
+    try {
+      String gatewayAddress = dag.getValue(DAGContext.GATEWAY_CONNECT_ADDRESS);
+      if (gatewayAddress == null) {
+        gatewayAddress = "localhost:9090";
+      }
+      URI duri = URI.create("ws://" + gatewayAddress + "/pubsub");
+
+      PubSubWebSocketInputOperator userTxWsInput = getPubSubWebSocketInputOperator("userTxInput", dag, duri, "examples.app.frauddetect.submitTransaction");
+      PubSubWebSocketOutputOperator ccUserAlertWsOutput = getPubSubWebSocketOutputOperator("ccUserAlertQueryOutput", dag, duri, "examples.app.frauddetect.fraudAlert");
+      PubSubWebSocketOutputOperator avgUserAlertwsOutput = getPubSubWebSocketOutputOperator("avgUserAlertQueryOutput", dag, duri, "examples.app.frauddetect.fraudAlert");
+      PubSubWebSocketOutputOperator binUserAlertwsOutput = getPubSubWebSocketOutputOperator("binUserAlertOutput", dag, duri, "examples.app.frauddetect.fraudAlert");
+      PubSubWebSocketOutputOperator txSummaryWsOutput = getPubSubWebSocketOutputOperator("txSummaryWsOutput", dag, duri, "examples.app.frauddetect.txSummary");
+      SlidingWindowSumKeyVal<KeyValPair<MerchantKey, String>, Integer> smsOperator = dag.addOperator("movingSum", SlidingWindowSumKeyVal.class);
+
+      MerchantTransactionGenerator txReceiver = dag.addOperator("txReceiver", MerchantTransactionGenerator.class);
+      MerchantTransactionInputHandler txInputHandler = dag.addOperator("txInputHandler", new MerchantTransactionInputHandler());
+      BankIdNumberSamplerOperator binSampler = dag.addOperator("bankInfoFraudDetector", BankIdNumberSamplerOperator.class);
+
+      MerchantTransactionBucketOperator txBucketOperator = dag.addOperator("txFilter", MerchantTransactionBucketOperator.class);
+      RangeKeyVal rangeOperator = dag.addOperator("rangePerMerchant", new RangeKeyVal<MerchantKey, Long>());
+      SimpleMovingAverage<MerchantKey, Long> smaOperator = dag.addOperator("smaPerMerchant", SimpleMovingAverage.class);
+      TransactionStatsAggregator txStatsAggregator = dag.addOperator("txStatsAggregator", TransactionStatsAggregator.class);
+      AverageAlertingOperator avgAlertingOperator = dag.addOperator("avgAlerter", AverageAlertingOperator.class);
+      CreditCardAmountSamplerOperator ccSamplerOperator = dag.addOperator("amountFraudDetector", CreditCardAmountSamplerOperator.class);
+      HdfsStringOutputOperator hdfsOutputOperator = getHdfsOutputOperator("hdfsOutput", dag, "fraud");
+
+      MongoDBOutputOperator mongoTxStatsOperator = dag.addOperator("mongoTxStatsOutput", MongoDBOutputOperator.class);
+      MongoDBOutputOperator mongoBinAlertsOperator = dag.addOperator("mongoBinAlertsOutput", MongoDBOutputOperator.class);
+      MongoDBOutputOperator mongoCcAlertsOperator = dag.addOperator("mongoCcAlertsOutput", MongoDBOutputOperator.class);
+      MongoDBOutputOperator mongoAvgAlertsOperator = dag.addOperator("mongoAvgAlertsOutput", MongoDBOutputOperator.class);
+
+      dag.addStream("userTxStream", userTxWsInput.outputPort, txInputHandler.userTxInputPort);
+      dag.addStream("transactions", txReceiver.txOutputPort, txBucketOperator.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
+      dag.addStream("txData", txReceiver.txDataOutputPort, hdfsOutputOperator.input); // dump all tx into Hdfs
+      dag.addStream("userTransactions", txInputHandler.txOutputPort, txBucketOperator.txUserInputPort);
+      dag.addStream("bankInfoData", txBucketOperator.binCountOutputPort, smsOperator.data);
+      dag.addStream("bankInfoCount", smsOperator.integerSum, binSampler.txCountInputPort);
+      dag.addStream("filteredTransactions", txBucketOperator.txOutputPort, rangeOperator.data, smaOperator.data, avgAlertingOperator.txInputPort);
+
+      KeyPartitionCodec<MerchantKey, Long> txCodec = new KeyPartitionCodec<MerchantKey, Long>();
+      dag.setInputPortAttribute(rangeOperator.data, Context.PortContext.STREAM_CODEC, txCodec);
+      dag.setInputPortAttribute(smaOperator.data, Context.PortContext.STREAM_CODEC, txCodec);
+      dag.setInputPortAttribute(avgAlertingOperator.txInputPort, Context.PortContext.STREAM_CODEC, txCodec);
+
+      dag.addStream("creditCardData", txBucketOperator.ccAlertOutputPort, ccSamplerOperator.inputPort);
+      dag.addStream("txnSummaryData", txBucketOperator.summaryTxnOutputPort, txSummaryWsOutput.input);
+      dag.addStream("smaAlerts", smaOperator.doubleSMA, avgAlertingOperator.smaInputPort);
+      dag.addStream("binAlerts", binSampler.countAlertOutputPort, mongoBinAlertsOperator.inputPort);
+      dag.addStream("binAlertsNotification", binSampler.countAlertNotificationPort, binUserAlertwsOutput.input);
+      dag.addStream("rangeData", rangeOperator.range, txStatsAggregator.rangeInputPort);
+      dag.addStream("smaData", smaOperator.longSMA, txStatsAggregator.smaInputPort);
+      dag.addStream("txStatsOutput", txStatsAggregator.txDataOutputPort, mongoTxStatsOperator.inputPort);
+      dag.addStream("avgAlerts", avgAlertingOperator.avgAlertOutputPort, mongoAvgAlertsOperator.inputPort);
+      dag.addStream("avgAlertsNotification", avgAlertingOperator.avgAlertNotificationPort, avgUserAlertwsOutput.input);
+      dag.addStream("ccAlerts", ccSamplerOperator.ccAlertOutputPort, mongoCcAlertsOperator.inputPort);
+      dag.addStream("ccAlertsNotification", ccSamplerOperator.ccAlertNotificationPort, ccUserAlertWsOutput.input);
+
+    } catch (Exception exc) {
+      DTThrowable.rethrow(exc);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertData.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertData.java
new file mode 100644
index 0000000..6aca64d
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertData.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+/**
+ * POJO to capture average alert data.
+ *
+ * @since 0.9.0
+ */
+public class AverageAlertData
+{
+  public String merchantId;
+  public int terminalId;
+  public int zipCode;
+  public MerchantTransaction.MerchantType merchantType;
+  public long amount;
+  public double lastSmaValue;
+  public double change;
+  public boolean userGenerated;
+  public long time;
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertingOperator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertingOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertingOperator.java
new file mode 100644
index 0000000..1b1b64a
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertingOperator.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.validation.constraints.NotNull;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.examples.frauddetect.util.JsonUtils;
+import org.apache.commons.lang.mutable.MutableDouble;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Generate an alert if the current transaction amount received on tx input port for the given key is greater by n %
+ * than the SMA of the last application window as received on the SMA input port.
+ *
+ * @since 0.9.0
+ */
+public class AverageAlertingOperator extends BaseOperator
+{
+  private static final Logger Log = LoggerFactory.getLogger(AverageAlertingOperator.class);
+  private final transient JsonFactory jsonFactory = new JsonFactory();
+  private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory);
+  private Map<MerchantKey, MutableDouble> lastSMAMap = new HashMap<MerchantKey, MutableDouble>();
+  private Map<MerchantKey, MutableDouble> currentSMAMap = new HashMap<MerchantKey, MutableDouble>();
+  private List<AverageAlertData> alerts = new ArrayList<AverageAlertData>();
+  @NotNull
+  private int threshold;
+  private static final String brickMortarAlertMsg = "Transaction amount %d exceeded by %f (last SMA %f) for Merchant %s at Terminal %d!";
+  private static final String internetAlertMsg = "Transaction amount %d exceeded by %f (last SMA %f) for Merchant %s!";
+  public final transient DefaultOutputPort<String> avgAlertOutputPort = new DefaultOutputPort<String>();
+  public final transient DefaultOutputPort<Map<String, Object>> avgAlertNotificationPort = new DefaultOutputPort<Map<String, Object>>();
+  public final transient DefaultInputPort<KeyValPair<MerchantKey, Double>> smaInputPort =
+      new DefaultInputPort<KeyValPair<MerchantKey, Double>>()
+  {
+    @Override
+    public void process(KeyValPair<MerchantKey, Double> tuple)
+    {
+      MutableDouble currentSma = currentSMAMap.get(tuple.getKey());
+      if (currentSma == null) { // first sma for the given key
+        double sma = tuple.getValue();
+        currentSMAMap.put(tuple.getKey(), new MutableDouble(sma));
+        //lastSMAMap.put(tuple.getKey(), new MutableDouble(sma));
+      } else { // move the current SMA value to the last SMA Map
+        //lastSMAMap.get(tuple.getKey()).setValue(currentSma.getValue());
+        currentSma.setValue(tuple.getValue());  // update the current SMA value
+      }
+    }
+
+  };
+  public final transient DefaultInputPort<KeyValPair<MerchantKey, Long>> txInputPort =
+      new DefaultInputPort<KeyValPair<MerchantKey, Long>>()
+  {
+    @Override
+    public void process(KeyValPair<MerchantKey, Long> tuple)
+    {
+      processTuple(tuple);
+    }
+
+  };
+
+  private void processTuple(KeyValPair<MerchantKey, Long> tuple)
+  {
+    MerchantKey merchantKey = tuple.getKey();
+    MutableDouble lastSma = lastSMAMap.get(tuple.getKey());
+    long txValue = tuple.getValue();
+    if (lastSma != null && txValue > lastSma.doubleValue()) {
+      double lastSmaValue = lastSma.doubleValue();
+      double change = txValue - lastSmaValue;
+      if (change > threshold) { // generate an alert
+        AverageAlertData data = getOutputData(merchantKey, txValue, change, lastSmaValue);
+        alerts.add(data);
+        //if (userGenerated) {   // if its user generated only the pass it to WebSocket
+        if (merchantKey.merchantType == MerchantTransaction.MerchantType.BRICK_AND_MORTAR) {
+          avgAlertNotificationPort.emit(getOutputData(data, String.format(brickMortarAlertMsg, txValue, change, lastSmaValue, merchantKey.merchantId, merchantKey.terminalId)));
+        } else { // its internet based
+          avgAlertNotificationPort.emit(getOutputData(data, String.format(internetAlertMsg, txValue, change, lastSmaValue, merchantKey.merchantId)));
+
+        }
+        //}
+      }
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    for (AverageAlertData data : alerts) {
+      try {
+        avgAlertOutputPort.emit(JsonUtils.toJson(data));
+      } catch (IOException e) {
+        logger.warn("Exception while converting object to JSON", e);
+      }
+    }
+
+    alerts.clear();
+
+    for (Map.Entry<MerchantKey, MutableDouble> entry : currentSMAMap.entrySet()) {
+      MerchantKey key = entry.getKey();
+      MutableDouble currentSma = entry.getValue();
+      MutableDouble lastSma = lastSMAMap.get(key);
+      if (lastSma == null) {
+        lastSma = new MutableDouble(currentSma.doubleValue());
+        lastSMAMap.put(key, lastSma);
+      } else {
+        lastSma.setValue(currentSma.getValue());
+      }
+    }
+  }
+
+  private AverageAlertData getOutputData(MerchantKey key, long amount, double change, double lastSmaValue)
+  {
+    AverageAlertData data = new AverageAlertData();
+
+    data.merchantId = key.merchantId;
+    data.terminalId = key.terminalId == null ? 0 : key.terminalId;
+    data.zipCode = key.zipCode;
+    data.merchantType = key.merchantType;
+    data.amount = amount;
+    data.lastSmaValue = lastSmaValue;
+    data.change = change;
+    //data.userGenerated = userGenerated;
+    data.userGenerated = key.userGenerated;
+    data.time = System.currentTimeMillis();
+
+    return data;
+  }
+
+  private Map<String, Object> getOutputData(AverageAlertData data, String msg)
+  {
+    Map<String, Object> output = new HashMap<String, Object>();
+    output.put("message", msg);
+    output.put("alertType", "aboveAvg");
+    output.put("userGenerated", "" + data.userGenerated);
+    output.put("alertData", data);
+
+    try {
+      String str = mapper.writeValueAsString(output);
+      logger.debug("user generated tx alert: " + str);
+    } catch (Exception exc) {
+      //ignore
+    }
+    return output;
+  }
+
+  public int getThreshold()
+  {
+    return threshold;
+  }
+
+  public void setThreshold(int threshold)
+  {
+    this.threshold = threshold;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(AverageAlertingOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberAlertData.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberAlertData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberAlertData.java
new file mode 100644
index 0000000..28cd19a
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberAlertData.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+/**
+ * POJO to capture data related to alerts for repetitive bank id number data usage.
+ *
+ * @since 0.9.0
+ */
+public class BankIdNumberAlertData
+{
+  public String merchantId;
+  public int terminalId;
+  public int zipCode;
+  public MerchantTransaction.MerchantType merchantType;
+  public String bankIdNum;
+  public int count;
+  public boolean userGenerated;
+  public long time;
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberKey.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberKey.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberKey.java
new file mode 100644
index 0000000..eea6b14
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberKey.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.Serializable;
+
+import com.datatorrent.lib.util.TimeBucketKey;
+
+/**
+ * Bank Id Number Key
+ *
+ * @since 0.9.0
+ */
+public class BankIdNumberKey extends TimeBucketKey implements Serializable
+{
+  public String bankIdNum;
+
+  public BankIdNumberKey()
+  {
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int key = 0;
+    key |= (1 << 1);
+    key |= (bankIdNum.hashCode());
+    return super.hashCode() ^ key;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (!(obj instanceof BankIdNumberKey)) {
+      return false;
+    }
+    return super.equals(obj)
+            && bankIdNum.equals(((BankIdNumberKey)obj).bankIdNum);
+  }
+
+  @Override
+  public String toString()
+  {
+    StringBuilder sb = new StringBuilder(super.toString());
+    sb.append("|1:").append(bankIdNum);
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberSamplerOperator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberSamplerOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberSamplerOperator.java
new file mode 100644
index 0000000..0731e3c
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberSamplerOperator.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.examples.frauddetect.util.JsonUtils;
+
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Count the transactions for the underlying aggregation window if the same BIN is
+ * being used for more than defined number of transactions. Output the data as needed
+ * by Mongo output operator
+ *
+ * @since 0.9.0
+ */
+public class BankIdNumberSamplerOperator extends BaseOperator
+{
+  private final transient JsonFactory jsonFactory = new JsonFactory();
+  private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory);
+  private int threshold;
+  private Map<MerchantKey, Map<String, BankIdNumData>> bankIdNumCountMap = new HashMap<MerchantKey, Map<String, BankIdNumData>>();
+  private static final String ALERT_MSG =
+      "Potential fraudulent CC transactions (same bank id %s and merchant %s) total transactions: %d";
+  /**
+   * Output the key-value pair for the BIN as key with the count as value.
+   */
+  public final transient DefaultOutputPort<String> countAlertOutputPort =
+      new DefaultOutputPort<String>();
+  public final transient DefaultOutputPort<Map<String, Object>> countAlertNotificationPort =
+      new DefaultOutputPort<Map<String, Object>>();
+
+  public int getThreshold()
+  {
+    return threshold;
+  }
+
+  public void setThreshold(int threshold)
+  {
+    this.threshold = threshold;
+  }
+
+  /*
+  public final transient DefaultInputPort<KeyValPair<MerchantKey, String>> txInputPort =
+          new DefaultInputPort<KeyValPair<MerchantKey, String>>()
+  {
+    @Override
+    public void process(KeyValPair<MerchantKey, String> tuple)
+    {
+      processTuple(tuple);
+    }
+
+  };
+
+  private void processTuple(KeyValPair<MerchantKey, String> tuple)
+  {
+    MerchantKey key = tuple.getKey();
+    Map<String, BankIdNumData> map = bankIdNumCountMap.get(key);
+    if (map == null) {
+      map = new HashMap<String, BankIdNumData>();
+      bankIdNumCountMap.put(key, map);
+    }
+    String bankIdNum = tuple.getValue();
+    BankIdNumData bankIdNumData = map.get(bankIdNum);
+    if (bankIdNumData == null) {
+      bankIdNumData = new BankIdNumData();
+      bankIdNumData.bankIdNum = bankIdNum;
+      map.put(bankIdNum, bankIdNumData);
+    }
+    bankIdNumData.count.increment();
+    if (key.userGenerated) {
+      bankIdNumData.userGenerated = true;
+    }
+  }
+  */
+
+  public final transient DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> txCountInputPort =
+      new DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>>()
+  {
+    @Override
+    public void process(KeyValPair<KeyValPair<MerchantKey, String>, Integer> tuple)
+    {
+      processTuple(tuple.getKey(), tuple.getValue());
+    }
+
+  };
+
+  private void processTuple(KeyValPair<MerchantKey, String> tuple, Integer count)
+  {
+    MerchantKey key = tuple.getKey();
+    Map<String, BankIdNumData> map = bankIdNumCountMap.get(key);
+    if (map == null) {
+      map = new HashMap<String, BankIdNumData>();
+      bankIdNumCountMap.put(key, map);
+    }
+    String bankIdNum = tuple.getValue();
+    BankIdNumData bankIdNumData = map.get(bankIdNum);
+    if (bankIdNumData == null) {
+      bankIdNumData = new BankIdNumData();
+      bankIdNumData.bankIdNum = bankIdNum;
+      map.put(bankIdNum, bankIdNumData);
+    }
+    bankIdNumData.count.setValue(count);
+    if (key.userGenerated) {
+      bankIdNumData.userGenerated = true;
+    }
+  }
+
+  /**
+   * Go through the BIN Counter map and check if any of the values for the BIN exceed the threshold.
+   * If yes, generate the alert on the output port.
+   */
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<MerchantKey, Map<String, BankIdNumData>> entry : bankIdNumCountMap.entrySet()) {
+      List<BankIdNumData> list = null;
+      MerchantKey key = entry.getKey();
+      if (key.merchantType == MerchantTransaction.MerchantType.INTERNET) {
+        continue;
+      }
+      list = dataOutput(entry.getValue());
+      if (list.size() > 0) {
+        for (BankIdNumData binData : list) {
+          BankIdNumberAlertData data = new BankIdNumberAlertData();
+          data.merchantId = key.merchantId;
+          data.terminalId = key.terminalId == null ? 0 : key.terminalId;
+          data.zipCode = key.zipCode;
+          data.merchantType = key.merchantType;
+          data.bankIdNum = binData.bankIdNum;
+          data.count = binData.count.intValue();
+          data.userGenerated = binData.userGenerated;
+          data.time = System.currentTimeMillis();
+          try {
+            countAlertOutputPort.emit(JsonUtils.toJson(data));
+            countAlertNotificationPort.emit(getOutputData(data));
+          } catch (IOException e) {
+            logger.warn("Exception while converting object to JSON: ", e);
+          }
+        }
+      }
+    }
+    bankIdNumCountMap.clear();
+  }
+
+  private List<BankIdNumData> dataOutput(Map<String, BankIdNumData> map)
+  {
+    List<BankIdNumData> list = new ArrayList<BankIdNumData>();
+    int count = 0;
+    for (Map.Entry<String, BankIdNumData> bankIdEntry : map.entrySet()) {
+      BankIdNumData data = bankIdEntry.getValue();
+      if (data.count.intValue() > threshold) {
+        list.add(data);
+      }
+    }
+    return list;
+  }
+
+  private Map<String, Object> getOutputData(BankIdNumberAlertData data)
+  {
+    Map<String, Object> output = new HashMap<String, Object>();
+    output.put("message", String.format(ALERT_MSG, data.bankIdNum, data.merchantId, data.count));
+    output.put("alertType", "sameBankId");
+    output.put("userGenerated", "" + data.userGenerated);
+    output.put("alertData", data);
+
+    try {
+      String str = mapper.writeValueAsString(output);
+      logger.debug("user generated tx alert: " + str);
+    } catch (Exception exc) {
+      //ignore
+    }
+
+    return output;
+  }
+
+  public static final class BankIdNumData
+  {
+    public String bankIdNum;
+    public MutableLong count = new MutableLong();
+    public boolean userGenerated = false;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(BankIdNumberSamplerOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAlertData.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAlertData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAlertData.java
new file mode 100644
index 0000000..f46f84a
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAlertData.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+/**
+ * POJO to capture data related to alerts for credit card number.
+ *
+ * @since 0.9.0
+ */
+public class CreditCardAlertData
+{
+  public String merchantId;
+  public int terminalId;
+  public int zipCode;
+  public MerchantTransaction.MerchantType merchantType;
+  public String fullCcNum;
+  public long small;
+  public long large;
+  public double threshold;
+  public boolean userGenerated;
+  public long time;
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAmountSamplerOperator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAmountSamplerOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAmountSamplerOperator.java
new file mode 100644
index 0000000..235e36e
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAmountSamplerOperator.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.examples.frauddetect.util.JsonUtils;
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * An operator to alert in case a transaction of a small lowAmount is followed by a transaction which is significantly larger for a given credit card number.
+ * This is done for each transaction. This also means that this happens for each individual credit card.
+ * It accepts merchant transaction object and for each CC listed in the transaction(s), checks for the transaction amounts. An alert is raised if the transaction
+ * lowAmount is significantly > the lowest amt in this window.
+ *
+ * @since 0.9.0
+ */
+public class CreditCardAmountSamplerOperator extends BaseOperator
+{
+  private final transient JsonFactory jsonFactory = new JsonFactory();
+  private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory);
+  private static final Logger logger = LoggerFactory.getLogger(Application.class);
+  // Factor to be applied to existing lowAmount to flag potential alerts.
+  private double threshold = 9500;
+  private Map<String, CreditCardInfo> ccTxnMap = new HashMap<String, CreditCardInfo>();
+  //private Map<String, MutableLong> ccQueryTxnMap = new HashMap<String, MutableLong>();
+  private List<CreditCardAlertData> alerts = new ArrayList<CreditCardAlertData>();
+  //private List<CreditCardAlertData> userAlerts = new ArrayList<CreditCardAlertData>();
+  private static final String ALERT_MSG =
+      "Potential fraudulent CC transactions (small one USD %d followed by large USD %d) performed using credit card: %s";
+  public final transient DefaultOutputPort<String> ccAlertOutputPort = new DefaultOutputPort<String>();
+  /*
+   public final transient DefaultOutputPort<Map<String, Object>> ccUserAlertOutputPort = new DefaultOutputPort<Map<String, Object>>();
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> ccAlertNotificationPort = new DefaultOutputPort<Map<String, Object>>();
+
+  public double getThreshold()
+  {
+    return threshold;
+  }
+
+  public void setThreshold(double threshold)
+  {
+    this.threshold = threshold;
+  }
+
+  private void processTuple(KeyValPair<MerchantKey, CreditCardData> tuple, Map<String, CreditCardInfo> txMap)
+  {
+    String fullCcNum = tuple.getValue().fullCcNum;
+    long ccAmount = tuple.getValue().amount;
+    MerchantKey key = tuple.getKey();
+
+    CreditCardInfo cardInfo = txMap.get(fullCcNum);
+
+    if (cardInfo != null) {
+      long currentSmallValue = cardInfo.lowAmount.longValue();
+      if (ccAmount < currentSmallValue) {
+        cardInfo.lowAmount.setValue(ccAmount);
+        cardInfo.time = key.time;
+      } else if (ccAmount > (currentSmallValue + threshold)) {
+        // If the transaction lowAmount is > 70% of the min. lowAmount, send an alert.
+
+        CreditCardAlertData data = new CreditCardAlertData();
+
+        data.merchantId = key.merchantId;
+        data.terminalId = key.terminalId == null ? 0 : key.terminalId;
+        data.zipCode = key.zipCode;
+        data.merchantType = key.merchantType;
+        data.fullCcNum = fullCcNum;
+        data.small = currentSmallValue;
+        data.large = ccAmount;
+        data.threshold = threshold;
+        data.userGenerated = key.userGenerated;
+        data.time = System.currentTimeMillis();
+
+        alerts.add(data);
+
+        /*
+         if (userGenerated){
+         userAlerts.add(data);
+         }
+         */
+        ccAlertNotificationPort.emit(getOutputData(data));
+
+        // Any high value transaction after a low value transaction with difference greater than threshold
+        // will trigger the alert. Not resetting the low value also helps in a system generated transaction
+        // alert not resetting the low value from a user generated transaction
+        //txMap.remove(fullCcNum);
+      }
+    } else {
+      cardInfo = new CreditCardInfo();
+      cardInfo.lowAmount.setValue(ccAmount);
+      cardInfo.time = key.time;
+      txMap.put(fullCcNum, cardInfo);
+    }
+  }
+
+  public transient DefaultInputPort<KeyValPair<MerchantKey, CreditCardData>> inputPort =
+      new DefaultInputPort<KeyValPair<MerchantKey, CreditCardData>>()
+  {
+    //
+    // This function checks if a CC entry exists.
+    // If so, it checks whether the current transaction is for an lowAmount lesser than the one stored in the hashmap. If so, this becomes the min. transaction lowAmount.
+    // If the lowAmount is > 70% of the existing lowAmount in the hash map, raise an alert.
+    //
+    @Override
+    public void process(KeyValPair<MerchantKey, CreditCardData> tuple)
+    {
+
+      processTuple(tuple, ccTxnMap);
+
+    }
+
+  };
+
+  @Override
+  public void endWindow()
+  {
+
+    for (CreditCardAlertData data : alerts) {
+      try {
+        ccAlertOutputPort.emit(JsonUtils.toJson(data));
+      } catch (IOException e) {
+        logger.warn("Exception while converting object to JSON", e);
+      }
+    }
+
+    //for (CreditCardAlertData data: userAlerts) {
+       /*for (CreditCardAlertData data: alerts) {
+     ccAlertNotificationPort.emit(getOutputData(data));
+     }*/
+
+    long ctime = System.currentTimeMillis();
+    Iterator<Map.Entry<String, CreditCardInfo>> iterator = ccTxnMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<String, CreditCardInfo> entry = iterator.next();
+      long time = entry.getValue().time;
+      if ((ctime - time) > 60000) {
+        iterator.remove();
+      }
+    }
+
+    //ccTxnMap.clear();
+    alerts.clear();
+
+    //ccQueryTxnMap.clear();
+    //userAlerts.clear();
+  }
+
+  private static class CreditCardInfo
+  {
+    MutableLong lowAmount = new MutableLong();
+    Long time;
+  }
+
+  private Map<String, Object> getOutputData(CreditCardAlertData data)
+  {
+    Map<String, Object> output = new HashMap<String, Object>();
+    output.put("message", String.format(ALERT_MSG, data.small, data.large, data.fullCcNum));
+    output.put("alertType", "smallThenLarge");
+    output.put("userGenerated", "" + data.userGenerated);
+    output.put("alertData", data);
+
+    try {
+      String str = mapper.writeValueAsString(output);
+      logger.debug("Alert generated: " + str + " userGenerated: " + data.userGenerated);
+    } catch (Exception exc) {
+      //ignore
+    }
+
+    return output;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardData.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardData.java
new file mode 100644
index 0000000..7c667d6
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardData.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+/**
+ * Credit Card Data
+ *
+ * @since 0.9.0
+ */
+public class CreditCardData
+{
+  public String fullCcNum;
+  public long amount;
+
+  public CreditCardData()
+  {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantKey.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantKey.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantKey.java
new file mode 100644
index 0000000..d73c693
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantKey.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+
+import java.io.Serializable;
+
+/**
+ * A time-based key for merchant data.
+ *
+ * @since 0.9.0
+ */
+public class MerchantKey implements Serializable
+{
+  public String merchantId;
+  public Integer terminalId;
+  public Integer zipCode;
+  public String country;
+  public MerchantTransaction.MerchantType merchantType;
+  public Long time;
+  public boolean userGenerated;
+
+  public MerchantKey()
+  {
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int key = 0;
+    if (merchantId != null) {
+      key |= (1 << 1);
+      key |= (merchantId.hashCode());
+    }
+    if (terminalId != null) {
+      key |= (1 << 2);
+      key |= (terminalId << 2);
+    }
+    if (zipCode != null) {
+      key |= (1 << 3);
+      key |= (zipCode << 3);
+    }
+    if (country != null) {
+      key |= (1 << 4);
+      key |= (country.hashCode());
+    }
+    if (merchantType != null) {
+      key |= (1 << 5);
+      key |= (merchantType.hashCode());
+    }
+    return key;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (!(obj instanceof MerchantKey)) {
+      return false;
+    }
+    MerchantKey mkey = (MerchantKey)obj;
+    return checkStringEqual(this.merchantId, mkey.merchantId)
+            && checkIntEqual(this.terminalId, mkey.terminalId)
+            && checkIntEqual(this.zipCode, mkey.zipCode)
+            && checkStringEqual(this.country, mkey.country)
+            && checkIntEqual(this.merchantType.ordinal(), mkey.merchantType.ordinal());
+  }
+
+  private boolean checkIntEqual(Integer a, Integer b)
+  {
+    if ((a == null) && (b == null)) {
+      return true;
+    }
+    if ((a != null) && (b != null) && a.intValue() == b.intValue()) {
+      return true;
+    }
+    return false;
+  }
+
+  private boolean checkStringEqual(String a, String b)
+  {
+    if ((a == null) && (b == null)) {
+      return true;
+    }
+    if ((a != null) && a.equals(b)) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public String toString()
+  {
+    StringBuilder sb = new StringBuilder();
+    if (merchantId != null) {
+      sb.append("|1:").append(merchantId);
+    }
+    if (terminalId != null) {
+      sb.append("|2:").append(terminalId);
+    }
+    if (zipCode != null) {
+      sb.append("|3:").append(zipCode);
+    }
+    if (country != null) {
+      sb.append("|4:").append(country);
+    }
+    if (merchantType != null) {
+      sb.append("|5:").append(merchantType);
+    }
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantQueryInputHandler.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantQueryInputHandler.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantQueryInputHandler.java
new file mode 100644
index 0000000..e6a4680
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantQueryInputHandler.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.util.Map;
+
+/**
+ * Common utility class that can be used by all other operators to handle user input
+ * captured from the Web socket input port.
+ *
+ * @since 0.9.0
+ */
+public class MerchantQueryInputHandler
+{
+  public static final String KEY_DATA = "data";
+  public static final String KEY_MERCHANT_ID = "merchantId";
+  public static final String KEY_TERMINAL_ID = "terminalId";
+  public static final String KEY_ZIP_CODE = "zipCode";
+
+  public static MerchantKey process(Map<String, Object> tuple)
+  {
+    String merchantId = null;
+    Integer terminalId = null;
+    Integer zipCode = null;
+
+    // ignoring other top-level attributes.
+    Map<String, Object> data = (Map<String, Object>)tuple.get(KEY_DATA);
+    if (data.get(KEY_MERCHANT_ID) != null) {
+      merchantId = (String)data.get(KEY_MERCHANT_ID);
+    }
+    if (data.get(KEY_TERMINAL_ID) != null) {
+      terminalId = (Integer)data.get(KEY_TERMINAL_ID);
+    }
+    if (data.get(KEY_ZIP_CODE) != null) {
+      zipCode = (Integer)data.get(KEY_ZIP_CODE);
+    }
+
+    MerchantKey key = new MerchantKey();
+    key.merchantId = merchantId;
+    key.terminalId = terminalId;
+    key.zipCode = zipCode;
+    key.country = "USA";
+    if (merchantId != null) {
+      key.merchantType = key.merchantId.equalsIgnoreCase(MerchantTransactionGenerator.merchantIds[2])
+              || key.merchantId.equalsIgnoreCase(MerchantTransactionGenerator.merchantIds[3])
+                         ? MerchantTransaction.MerchantType.INTERNET
+                         : MerchantTransaction.MerchantType.BRICK_AND_MORTAR;
+    }
+    return key;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransaction.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransaction.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransaction.java
new file mode 100644
index 0000000..a722492
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransaction.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.Serializable;
+
+/**
+ * POJO for BIN Alert related data.
+ *
+ * @since 0.9.0
+ */
+public class MerchantTransaction implements Serializable
+{
+  public enum MerchantType
+  {
+    UNDEFINED, BRICK_AND_MORTAR, INTERNET
+  }
+
+  public enum TransactionType
+  {
+    UNDEFINED, POS
+  }
+
+  public String ccNum;
+  public String bankIdNum;
+  public String fullCcNum;
+  public Long amount;
+  public String merchantId;
+  public Integer terminalId;
+  public Integer zipCode;
+  public String country;
+  public MerchantType merchantType = MerchantType.UNDEFINED;
+  public TransactionType transactionType = TransactionType.UNDEFINED;
+  public Long time;
+  public boolean userGenerated;
+
+  public MerchantTransaction()
+  {
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int key = 0;
+    if (ccNum != null) {
+      key |= (1 << 1);
+      key |= (ccNum.hashCode());
+    }
+    if (bankIdNum != null) {
+      key |= (1 << 2);
+      key |= (bankIdNum.hashCode());
+    }
+    if (amount != null) {
+      key |= (1 << 6);
+      key |= (amount << 4);
+    }
+    if (merchantId != null) {
+      key |= (1 << 3);
+      key |= (merchantId.hashCode());
+    }
+    if (terminalId != null) {
+      key |= (1 << 4);
+      key |= (terminalId << 2);
+    }
+    if (zipCode != null) {
+      key |= (1 << 5);
+      key |= (zipCode << 3);
+    }
+    if (country != null) {
+      key |= (1 << 7);
+      key |= (country.hashCode());
+    }
+    if (merchantType != null) {
+      key |= (1 << 8);
+      key |= (merchantType.hashCode());
+    }
+    if (transactionType != null) {
+      key |= (1 << 9);
+      key |= (transactionType.hashCode());
+    }
+    if (fullCcNum != null) {
+      key |= (1 << 10);
+      key |= (fullCcNum.hashCode());
+    }
+    if (time != null) {
+      key |= (1 << 11);
+      key |= (time << 2);
+    }
+
+    return key;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (!(obj instanceof MerchantTransaction)) {
+      return false;
+    }
+    MerchantTransaction mtx = (MerchantTransaction)obj;
+    return checkStringEqual(this.ccNum, mtx.ccNum)
+            && checkStringEqual(this.bankIdNum, mtx.bankIdNum)
+            && checkLongEqual(this.amount, mtx.amount)
+            && checkStringEqual(this.merchantId, mtx.merchantId)
+            && checkIntEqual(this.terminalId, mtx.terminalId)
+            && checkIntEqual(this.zipCode, mtx.zipCode)
+            && checkStringEqual(this.country, mtx.country)
+            && checkIntEqual(this.merchantType.ordinal(), mtx.merchantType.ordinal())
+            && checkIntEqual(this.transactionType.ordinal(), mtx.transactionType.ordinal())
+            && checkStringEqual(this.fullCcNum, mtx.fullCcNum)
+            && checkLongEqual(this.time, mtx.time);
+  }
+
+  private boolean checkIntEqual(Integer a, Integer b)
+  {
+    if ((a == null) && (b == null)) {
+      return true;
+    }
+    if ((a != null) && (b != null) && a.intValue() == b.intValue()) {
+      return true;
+    }
+    return false;
+  }
+
+  private boolean checkLongEqual(Long a, Long b)
+  {
+    if ((a == null) && (b == null)) {
+      return true;
+    }
+    if ((a != null) && (b != null) && a.longValue() == b.longValue()) {
+      return true;
+    }
+    return false;
+  }
+
+  private boolean checkStringEqual(String a, String b)
+  {
+    if ((a == null) && (b == null)) {
+      return true;
+    }
+    if ((a != null) && a.equals(b)) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public String toString()
+  {
+    StringBuilder sb = new StringBuilder();
+    if (ccNum != null) {
+      sb.append("|0:").append(ccNum);
+    }
+    if (bankIdNum != null) {
+      sb.append("|1:").append(bankIdNum);
+    }
+    if (fullCcNum != null) {
+      sb.append("|2:").append(fullCcNum);
+    }
+    if (amount != null) {
+      sb.append("|3:").append(amount);
+    }
+    if (merchantId != null) {
+      sb.append("|4:").append(merchantId);
+    }
+    if (terminalId != null) {
+      sb.append("|5:").append(terminalId);
+    }
+    if (zipCode != null) {
+      sb.append("|6:").append(zipCode);
+    }
+    if (country != null) {
+      sb.append("|7:").append(country);
+    }
+    if (merchantType != null) {
+      sb.append("|8:").append(merchantType);
+    }
+    if (transactionType != null) {
+      sb.append("|9:").append(transactionType);
+    }
+    if (time != null) {
+      sb.append("|10:").append(time);
+    }
+    return sb.toString();
+  }
+
+}