You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:19 UTC
[14/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageReceiver.java
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageReceiver.java b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageReceiver.java
new file mode 100644
index 0000000..34a2c56
--- /dev/null
+++ b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageReceiver.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.echoserver;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+
+/**
+ * @since 2.1.0
+ */
+public class MessageReceiver implements InputOperator, NetworkManager.ChannelListener<DatagramChannel>
+{
+ private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
+
+ private transient NetworkManager.ChannelAction<DatagramChannel> action;
+
+ //Need the sender info, using a packet for now instead of the buffer
+ private transient ByteBuffer buffer;
+ //private transient DatagramPacket packet;
+
+ private int port = 9000;
+ private int maxMesgSize = 512;
+ private int inactiveWait = 10;
+ private boolean readReady = false;
+
+ @Override
+ public void emitTuples()
+ {
+ boolean emitData = false;
+ if (readReady) {
+ //DatagramSocket socket = action.channelConfiguration.socket;
+ try {
+ //socket.receive(packet);
+ DatagramChannel channel = action.channelConfiguration.channel;
+ SocketAddress address = channel.receive(buffer);
+ if (address != null) {
+ /*
+ StringBuilder sb = new StringBuilder();
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ sb.append(buffer.getChar());
+ }
+ String mesg = sb.toString();
+ */
+ buffer.flip();
+ String mesg = new String(buffer.array(), 0, buffer.limit());
+ logger.info("Message {}", mesg);
+ Message message = new Message();
+ message.message = mesg;
+ message.socketAddress = address;
+ messageOutput.emit(message);
+ emitData = true;
+ buffer.clear();
+ }
+ //String mesg = new String(packet.getData(), packet.getOffset(), packet.getLength());
+ } catch (IOException e) {
+ throw new RuntimeException("Error reading from channel", e);
+ }
+ // Even if we miss a readReady because of not locking we will get it again immediately
+ readReady = false;
+ }
+ if (!emitData) {
+ synchronized (buffer) {
+ try {
+ if (!readReady) {
+ buffer.wait(inactiveWait);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void beginWindow(long l)
+ {
+
+ }
+
+ @Override
+ public void endWindow()
+ {
+
+ }
+
+ public final transient DefaultOutputPort<Message> messageOutput = new DefaultOutputPort<Message>();
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ try {
+ //byte[] mesgData = new byte[maxMesgSize];
+ //packet = new DatagramPacket(mesgData, maxMesgSize);
+ buffer = ByteBuffer.allocate(maxMesgSize);
+ action = NetworkManager.getInstance().registerAction(port, NetworkManager.ConnectionType.UDP, this, SelectionKey.OP_READ);
+ } catch (IOException e) {
+ throw new RuntimeException("Error initializing receiver", e);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ NetworkManager.getInstance().unregisterAction(action);
+ } catch (Exception e) {
+ throw new RuntimeException("Error shutting down receiver", e);
+ }
+ }
+
+ @Override
+ public void ready(NetworkManager.ChannelAction<DatagramChannel> action, int readyOps)
+ {
+ synchronized (buffer) {
+ readReady = true;
+ buffer.notify();
+ }
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public void setPort(int port)
+ {
+ this.port = port;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageResponder.java
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageResponder.java b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageResponder.java
new file mode 100644
index 0000000..d34bf3e
--- /dev/null
+++ b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageResponder.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.echoserver;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * @since 2.1.0
+ */
+public class MessageResponder extends BaseOperator
+{
+
+ private String responseHeader = "Response: ";
+
+ private int port = 9000;
+ private int maxMesgSize = 512;
+ private transient NetworkManager.ChannelAction<DatagramChannel> action;
+ private transient ByteBuffer buffer;
+
+ public final transient DefaultInputPort<Message> messageInput = new DefaultInputPort<Message>()
+ {
+ @Override
+ public void process(Message message)
+ {
+ String sendMesg = responseHeader + message.message;
+ SocketAddress address = message.socketAddress;
+ buffer.put(sendMesg.getBytes());
+ buffer.flip();
+ try {
+ action.channelConfiguration.channel.send(buffer, address);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ buffer.clear();
+ }
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ try {
+ buffer = ByteBuffer.allocate(maxMesgSize);
+ action = NetworkManager.getInstance().registerAction(port, NetworkManager.ConnectionType.UDP, null, 0);
+ } catch (IOException e) {
+ throw new RuntimeException("Error initializer responder", e);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ NetworkManager.getInstance().unregisterAction(action);
+ } catch (Exception e) {
+ throw new RuntimeException("Error shutting down responder", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/NetworkManager.java
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/NetworkManager.java b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/NetworkManager.java
new file mode 100644
index 0000000..88cf621
--- /dev/null
+++ b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/NetworkManager.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.echoserver;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since 2.1.0
+ */
+public class NetworkManager implements Runnable
+{
+ private static final Logger logger = LoggerFactory.getLogger(NetworkManager.class);
+
+ public static enum ConnectionType
+ {
+ TCP,
+ UDP
+ }
+
+ private static NetworkManager _instance;
+ private Selector selector;
+
+ private volatile boolean doRun = false;
+ private Thread selThread;
+ private long selTimeout = 1000;
+ private volatile Exception selEx;
+
+ private Map<ConnectionInfo, ChannelConfiguration> channels;
+ private Map<SelectableChannel, ChannelConfiguration> channelConfigurations;
+
+ public static NetworkManager getInstance() throws IOException
+ {
+ if (_instance == null) {
+ synchronized (NetworkManager.class) {
+ if (_instance == null) {
+ _instance = new NetworkManager();
+ }
+ }
+ }
+ return _instance;
+ }
+
+ private NetworkManager() throws IOException
+ {
+ channels = new HashMap<ConnectionInfo, ChannelConfiguration>();
+ channelConfigurations = new HashMap<SelectableChannel, ChannelConfiguration>();
+ }
+
+ public synchronized <T extends SelectableChannel> ChannelAction<T> registerAction(int port, ConnectionType type, ChannelListener<T> listener, int ops) throws IOException
+ {
+ boolean startProc = (channels.size() == 0);
+ SelectableChannel channel = null;
+ SocketAddress address = new InetSocketAddress(port);
+ ConnectionInfo connectionInfo = new ConnectionInfo();
+ connectionInfo.address = address;
+ connectionInfo.connectionType = type;
+ ChannelConfiguration channelConfiguration = channels.get(connectionInfo);
+ if (channelConfiguration == null) {
+ Object socket = null;
+ if (type == ConnectionType.TCP) {
+ SocketChannel schannel = SocketChannel.open();
+ schannel.configureBlocking(false);
+ Socket ssocket = schannel.socket();
+ ssocket.bind(address);
+ socket = ssocket;
+ channel = schannel;
+ } else if (type == ConnectionType.UDP) {
+ DatagramChannel dchannel = DatagramChannel.open();
+ dchannel.configureBlocking(false);
+ DatagramSocket dsocket = dchannel.socket();
+ dsocket.bind(address);
+ socket = dsocket;
+ channel = dchannel;
+ }
+ if (channel == null) {
+ throw new IOException("Unsupported connection type");
+ }
+ channelConfiguration = new ChannelConfiguration();
+ channelConfiguration.actions = new ConcurrentLinkedQueue<ChannelAction>();
+ channelConfiguration.channel = channel;
+ channelConfiguration.connectionInfo = connectionInfo;
+ channels.put(connectionInfo, channelConfiguration);
+ channelConfigurations.put(channel, channelConfiguration);
+ } else {
+ channel = channelConfiguration.channel;
+ }
+ ChannelAction channelAction = new ChannelAction();
+ channelAction.channelConfiguration = channelConfiguration;
+ channelAction.listener = listener;
+ channelAction.ops = ops;
+ channelConfiguration.actions.add(channelAction);
+ if (startProc) {
+ startProcess();
+ }
+ if (listener != null) {
+ channel.register(selector, ops);
+ }
+ return channelAction;
+ }
+
+ public synchronized void unregisterAction(ChannelAction action) throws IOException, InterruptedException
+ {
+ ChannelConfiguration channelConfiguration = action.channelConfiguration;
+ SelectableChannel channel = channelConfiguration.channel;
+ if (channelConfiguration != null) {
+ channelConfiguration.actions.remove(action);
+ if (channelConfiguration.actions.size() == 0) {
+ ConnectionInfo connectionInfo = channelConfiguration.connectionInfo;
+ channelConfigurations.remove(channel);
+ channels.remove(connectionInfo);
+ channel.close();
+ }
+ }
+ if (channels.size() == 0) {
+ stopProcess();
+ }
+ }
+
+ private void startProcess() throws IOException
+ {
+ selector = Selector.open();
+ doRun = true;
+ selThread = new Thread(this);
+ selThread.start();
+ }
+
+ private void stopProcess() throws InterruptedException, IOException
+ {
+ doRun = false;
+ selThread.join();
+ selector.close();
+ }
+
+ @Override
+ public void run()
+ {
+ try {
+ while (doRun) {
+ int keys = selector.select(selTimeout);
+ if (keys > 0) {
+ Set<SelectionKey> selectionKeys = selector.selectedKeys();
+ for (SelectionKey selectionKey : selectionKeys) {
+ int readyOps = selectionKey.readyOps();
+ ChannelConfiguration channelConfiguration = channelConfigurations.get(selectionKey.channel());
+ Collection<ChannelAction> actions = channelConfiguration.actions;
+ for (ChannelAction action : actions) {
+ if (((readyOps & action.ops) != 0) && (action.listener != null)) {
+ action.listener.ready(action, readyOps);
+ }
+ }
+ }
+ selectionKeys.clear();
+ }
+ }
+ } catch (IOException e) {
+ logger.error("Error in select", e);
+ selEx = e;
+ }
+ }
+
+ public static interface ChannelListener<T extends SelectableChannel>
+ {
+ public void ready(ChannelAction<T> action, int readyOps);
+ }
+
+ public static class ChannelConfiguration<T extends SelectableChannel>
+ {
+ public T channel;
+ public ConnectionInfo connectionInfo;
+ public Collection<ChannelAction> actions;
+ }
+
+ public static class ChannelAction<T extends SelectableChannel>
+ {
+ public ChannelConfiguration<T> channelConfiguration;
+ public ChannelListener<T> listener;
+ public int ops;
+ }
+
+ private static class ConnectionInfo
+ {
+ public SocketAddress address;
+ public ConnectionType connectionType;
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ConnectionInfo that = (ConnectionInfo)o;
+
+ if (connectionType != that.connectionType) {
+ return false;
+ }
+ if (!address.equals(that.address)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = address.hashCode();
+ result = 31 * result + connectionType.hashCode();
+ return result;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/main/resources/META-INF/properties.xml b/examples/echoserver/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..0822f4f
--- /dev/null
+++ b/examples/echoserver/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <!--
+ <property>
+ <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+ <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+ </property>
+ -->
+ <property>
+ <name>dt.attr.MASTER_MEMORY_MB</name>
+ <value>1024</value>
+ </property>
+ <property>
+ <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
+ <value>hello world: %s</value>
+ </property>
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/test/java/org/apache/apex/examples/echoserver/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/test/java/org/apache/apex/examples/echoserver/ApplicationTest.java b/examples/echoserver/src/test/java/org/apache/apex/examples/echoserver/ApplicationTest.java
new file mode 100644
index 0000000..c4a3ad4
--- /dev/null
+++ b/examples/echoserver/src/test/java/org/apache/apex/examples/echoserver/ApplicationTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.echoserver;
+
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest
+{
+
+ @Test
+ public void testApplication() throws IOException, Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ lma.prepareDAG(new Application(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10000); // runs for 10 seconds and quits
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/test/resources/log4j.properties b/examples/echoserver/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/echoserver/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/pom.xml
----------------------------------------------------------------------
diff --git a/examples/frauddetect/pom.xml b/examples/frauddetect/pom.xml
new file mode 100644
index 0000000..36d4153
--- /dev/null
+++ b/examples/frauddetect/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>malhar-examples-frauddetect</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar Fraud Detect Example</name>
+ <description>Apex example application that demonstrates real-time pattern detection in the incoming data and alerting. The example processes streaming credit card transactions and looks for fraudulent transactions.</description>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <skipTests>true</skipTests>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <version>2.10.1</version>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/assemble/appPackage.xml b/examples/frauddetect/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/frauddetect/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
new file mode 100644
index 0000000..73c38ef
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.Serializable;
+import java.net.URI;
+import org.apache.apex.examples.frauddetect.operator.HdfsStringOutputOperator;
+import org.apache.apex.examples.frauddetect.operator.MongoDBOutputOperator;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
+import com.datatorrent.lib.math.RangeKeyVal;
+import com.datatorrent.lib.multiwindow.SimpleMovingAverage;
+import com.datatorrent.lib.util.BaseKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.netlet.util.DTThrowable;
+
+
+/**
+ * Fraud detection application
+ *
+ * @since 0.9.0
+ */
+@ApplicationAnnotation(name = "FraudDetectExample")
+public class Application implements StreamingApplication
+{
+
+ public PubSubWebSocketInputOperator getPubSubWebSocketInputOperator(String name, DAG dag, URI duri, String topic) throws Exception
+ {
+ PubSubWebSocketInputOperator reqin = dag.addOperator(name, new PubSubWebSocketInputOperator());
+ reqin.setUri(duri);
+ reqin.setTopic(topic);
+ return reqin;
+ }
+
+ public PubSubWebSocketOutputOperator getPubSubWebSocketOutputOperator(String name, DAG dag, URI duri, String topic) throws Exception
+ {
+ PubSubWebSocketOutputOperator out = dag.addOperator(name, new PubSubWebSocketOutputOperator());
+ out.setUri(duri);
+ return out;
+ }
+
+ public HdfsStringOutputOperator getHdfsOutputOperator(String name, DAG dag, String folderName)
+ {
+ HdfsStringOutputOperator oper = dag.addOperator("hdfs", HdfsStringOutputOperator.class);
+ oper.setFilePath(folderName);
+ oper.setMaxLength(1024 * 1024 * 1024);
+ return oper;
+ }
+
+ public ConsoleOutputOperator getConsoleOperator(String name, DAG dag, String prefix, String format)
+ {
+ ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class);
+ // oper.setStringFormat(prefix + ": " + format);
+ return oper;
+ }
+
+ public static class KeyPartitionCodec<K, V> extends BaseKeyValueOperator.DefaultPartitionCodec<K,V> implements Serializable
+ {
+ private static final long serialVersionUID = 201410031623L;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+
+ try {
+ String gatewayAddress = dag.getValue(DAGContext.GATEWAY_CONNECT_ADDRESS);
+ if (gatewayAddress == null) {
+ gatewayAddress = "localhost:9090";
+ }
+ URI duri = URI.create("ws://" + gatewayAddress + "/pubsub");
+
+ PubSubWebSocketInputOperator userTxWsInput = getPubSubWebSocketInputOperator("userTxInput", dag, duri, "examples.app.frauddetect.submitTransaction");
+ PubSubWebSocketOutputOperator ccUserAlertWsOutput = getPubSubWebSocketOutputOperator("ccUserAlertQueryOutput", dag, duri, "examples.app.frauddetect.fraudAlert");
+ PubSubWebSocketOutputOperator avgUserAlertwsOutput = getPubSubWebSocketOutputOperator("avgUserAlertQueryOutput", dag, duri, "examples.app.frauddetect.fraudAlert");
+ PubSubWebSocketOutputOperator binUserAlertwsOutput = getPubSubWebSocketOutputOperator("binUserAlertOutput", dag, duri, "examples.app.frauddetect.fraudAlert");
+ PubSubWebSocketOutputOperator txSummaryWsOutput = getPubSubWebSocketOutputOperator("txSummaryWsOutput", dag, duri, "examples.app.frauddetect.txSummary");
+ SlidingWindowSumKeyVal<KeyValPair<MerchantKey, String>, Integer> smsOperator = dag.addOperator("movingSum", SlidingWindowSumKeyVal.class);
+
+ MerchantTransactionGenerator txReceiver = dag.addOperator("txReceiver", MerchantTransactionGenerator.class);
+ MerchantTransactionInputHandler txInputHandler = dag.addOperator("txInputHandler", new MerchantTransactionInputHandler());
+ BankIdNumberSamplerOperator binSampler = dag.addOperator("bankInfoFraudDetector", BankIdNumberSamplerOperator.class);
+
+ MerchantTransactionBucketOperator txBucketOperator = dag.addOperator("txFilter", MerchantTransactionBucketOperator.class);
+ RangeKeyVal rangeOperator = dag.addOperator("rangePerMerchant", new RangeKeyVal<MerchantKey, Long>());
+ SimpleMovingAverage<MerchantKey, Long> smaOperator = dag.addOperator("smaPerMerchant", SimpleMovingAverage.class);
+ TransactionStatsAggregator txStatsAggregator = dag.addOperator("txStatsAggregator", TransactionStatsAggregator.class);
+ AverageAlertingOperator avgAlertingOperator = dag.addOperator("avgAlerter", AverageAlertingOperator.class);
+ CreditCardAmountSamplerOperator ccSamplerOperator = dag.addOperator("amountFraudDetector", CreditCardAmountSamplerOperator.class);
+ HdfsStringOutputOperator hdfsOutputOperator = getHdfsOutputOperator("hdfsOutput", dag, "fraud");
+
+ MongoDBOutputOperator mongoTxStatsOperator = dag.addOperator("mongoTxStatsOutput", MongoDBOutputOperator.class);
+ MongoDBOutputOperator mongoBinAlertsOperator = dag.addOperator("mongoBinAlertsOutput", MongoDBOutputOperator.class);
+ MongoDBOutputOperator mongoCcAlertsOperator = dag.addOperator("mongoCcAlertsOutput", MongoDBOutputOperator.class);
+ MongoDBOutputOperator mongoAvgAlertsOperator = dag.addOperator("mongoAvgAlertsOutput", MongoDBOutputOperator.class);
+
+ dag.addStream("userTxStream", userTxWsInput.outputPort, txInputHandler.userTxInputPort);
+ dag.addStream("transactions", txReceiver.txOutputPort, txBucketOperator.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
+ dag.addStream("txData", txReceiver.txDataOutputPort, hdfsOutputOperator.input); // dump all tx into Hdfs
+ dag.addStream("userTransactions", txInputHandler.txOutputPort, txBucketOperator.txUserInputPort);
+ dag.addStream("bankInfoData", txBucketOperator.binCountOutputPort, smsOperator.data);
+ dag.addStream("bankInfoCount", smsOperator.integerSum, binSampler.txCountInputPort);
+ dag.addStream("filteredTransactions", txBucketOperator.txOutputPort, rangeOperator.data, smaOperator.data, avgAlertingOperator.txInputPort);
+
+ KeyPartitionCodec<MerchantKey, Long> txCodec = new KeyPartitionCodec<MerchantKey, Long>();
+ dag.setInputPortAttribute(rangeOperator.data, Context.PortContext.STREAM_CODEC, txCodec);
+ dag.setInputPortAttribute(smaOperator.data, Context.PortContext.STREAM_CODEC, txCodec);
+ dag.setInputPortAttribute(avgAlertingOperator.txInputPort, Context.PortContext.STREAM_CODEC, txCodec);
+
+ dag.addStream("creditCardData", txBucketOperator.ccAlertOutputPort, ccSamplerOperator.inputPort);
+ dag.addStream("txnSummaryData", txBucketOperator.summaryTxnOutputPort, txSummaryWsOutput.input);
+ dag.addStream("smaAlerts", smaOperator.doubleSMA, avgAlertingOperator.smaInputPort);
+ dag.addStream("binAlerts", binSampler.countAlertOutputPort, mongoBinAlertsOperator.inputPort);
+ dag.addStream("binAlertsNotification", binSampler.countAlertNotificationPort, binUserAlertwsOutput.input);
+ dag.addStream("rangeData", rangeOperator.range, txStatsAggregator.rangeInputPort);
+ dag.addStream("smaData", smaOperator.longSMA, txStatsAggregator.smaInputPort);
+ dag.addStream("txStatsOutput", txStatsAggregator.txDataOutputPort, mongoTxStatsOperator.inputPort);
+ dag.addStream("avgAlerts", avgAlertingOperator.avgAlertOutputPort, mongoAvgAlertsOperator.inputPort);
+ dag.addStream("avgAlertsNotification", avgAlertingOperator.avgAlertNotificationPort, avgUserAlertwsOutput.input);
+ dag.addStream("ccAlerts", ccSamplerOperator.ccAlertOutputPort, mongoCcAlertsOperator.inputPort);
+ dag.addStream("ccAlertsNotification", ccSamplerOperator.ccAlertNotificationPort, ccUserAlertWsOutput.input);
+
+ } catch (Exception exc) {
+ DTThrowable.rethrow(exc);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertData.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertData.java
new file mode 100644
index 0000000..6aca64d
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertData.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+/**
+ * POJO to capture average alert data.
+ *
+ * @since 0.9.0
+ */
+public class AverageAlertData
+{
+ public String merchantId;
+ public int terminalId;
+ public int zipCode;
+ public MerchantTransaction.MerchantType merchantType;
+ public long amount;
+ public double lastSmaValue;
+ public double change;
+ public boolean userGenerated;
+ public long time;
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertingOperator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertingOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertingOperator.java
new file mode 100644
index 0000000..1b1b64a
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertingOperator.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.validation.constraints.NotNull;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.examples.frauddetect.util.JsonUtils;
+import org.apache.commons.lang.mutable.MutableDouble;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Generate an alert if the current transaction amount received on tx input port for the given key is greater by n %
+ * than the SMA of the last application window as received on the SMA input port.
+ *
+ * @since 0.9.0
+ */
+public class AverageAlertingOperator extends BaseOperator
+{
+ private static final Logger Log = LoggerFactory.getLogger(AverageAlertingOperator.class);
+ private final transient JsonFactory jsonFactory = new JsonFactory();
+ private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory);
+ private Map<MerchantKey, MutableDouble> lastSMAMap = new HashMap<MerchantKey, MutableDouble>();
+ private Map<MerchantKey, MutableDouble> currentSMAMap = new HashMap<MerchantKey, MutableDouble>();
+ private List<AverageAlertData> alerts = new ArrayList<AverageAlertData>();
+ @NotNull
+ private int threshold;
+ private static final String brickMortarAlertMsg = "Transaction amount %d exceeded by %f (last SMA %f) for Merchant %s at Terminal %d!";
+ private static final String internetAlertMsg = "Transaction amount %d exceeded by %f (last SMA %f) for Merchant %s!";
+ public final transient DefaultOutputPort<String> avgAlertOutputPort = new DefaultOutputPort<String>();
+ public final transient DefaultOutputPort<Map<String, Object>> avgAlertNotificationPort = new DefaultOutputPort<Map<String, Object>>();
+ public final transient DefaultInputPort<KeyValPair<MerchantKey, Double>> smaInputPort =
+ new DefaultInputPort<KeyValPair<MerchantKey, Double>>()
+ {
+ @Override
+ public void process(KeyValPair<MerchantKey, Double> tuple)
+ {
+ MutableDouble currentSma = currentSMAMap.get(tuple.getKey());
+ if (currentSma == null) { // first sma for the given key
+ double sma = tuple.getValue();
+ currentSMAMap.put(tuple.getKey(), new MutableDouble(sma));
+ //lastSMAMap.put(tuple.getKey(), new MutableDouble(sma));
+ } else { // move the current SMA value to the last SMA Map
+ //lastSMAMap.get(tuple.getKey()).setValue(currentSma.getValue());
+ currentSma.setValue(tuple.getValue()); // update the current SMA value
+ }
+ }
+
+ };
+ public final transient DefaultInputPort<KeyValPair<MerchantKey, Long>> txInputPort =
+ new DefaultInputPort<KeyValPair<MerchantKey, Long>>()
+ {
+ @Override
+ public void process(KeyValPair<MerchantKey, Long> tuple)
+ {
+ processTuple(tuple);
+ }
+
+ };
+
+ private void processTuple(KeyValPair<MerchantKey, Long> tuple)
+ {
+ MerchantKey merchantKey = tuple.getKey();
+ MutableDouble lastSma = lastSMAMap.get(tuple.getKey());
+ long txValue = tuple.getValue();
+ if (lastSma != null && txValue > lastSma.doubleValue()) {
+ double lastSmaValue = lastSma.doubleValue();
+ double change = txValue - lastSmaValue;
+ if (change > threshold) { // generate an alert
+ AverageAlertData data = getOutputData(merchantKey, txValue, change, lastSmaValue);
+ alerts.add(data);
+ //if (userGenerated) { // if its user generated only the pass it to WebSocket
+ if (merchantKey.merchantType == MerchantTransaction.MerchantType.BRICK_AND_MORTAR) {
+ avgAlertNotificationPort.emit(getOutputData(data, String.format(brickMortarAlertMsg, txValue, change, lastSmaValue, merchantKey.merchantId, merchantKey.terminalId)));
+ } else { // its internet based
+ avgAlertNotificationPort.emit(getOutputData(data, String.format(internetAlertMsg, txValue, change, lastSmaValue, merchantKey.merchantId)));
+
+ }
+ //}
+ }
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ for (AverageAlertData data : alerts) {
+ try {
+ avgAlertOutputPort.emit(JsonUtils.toJson(data));
+ } catch (IOException e) {
+ logger.warn("Exception while converting object to JSON", e);
+ }
+ }
+
+ alerts.clear();
+
+ for (Map.Entry<MerchantKey, MutableDouble> entry : currentSMAMap.entrySet()) {
+ MerchantKey key = entry.getKey();
+ MutableDouble currentSma = entry.getValue();
+ MutableDouble lastSma = lastSMAMap.get(key);
+ if (lastSma == null) {
+ lastSma = new MutableDouble(currentSma.doubleValue());
+ lastSMAMap.put(key, lastSma);
+ } else {
+ lastSma.setValue(currentSma.getValue());
+ }
+ }
+ }
+
+ private AverageAlertData getOutputData(MerchantKey key, long amount, double change, double lastSmaValue)
+ {
+ AverageAlertData data = new AverageAlertData();
+
+ data.merchantId = key.merchantId;
+ data.terminalId = key.terminalId == null ? 0 : key.terminalId;
+ data.zipCode = key.zipCode;
+ data.merchantType = key.merchantType;
+ data.amount = amount;
+ data.lastSmaValue = lastSmaValue;
+ data.change = change;
+ //data.userGenerated = userGenerated;
+ data.userGenerated = key.userGenerated;
+ data.time = System.currentTimeMillis();
+
+ return data;
+ }
+
+ private Map<String, Object> getOutputData(AverageAlertData data, String msg)
+ {
+ Map<String, Object> output = new HashMap<String, Object>();
+ output.put("message", msg);
+ output.put("alertType", "aboveAvg");
+ output.put("userGenerated", "" + data.userGenerated);
+ output.put("alertData", data);
+
+ try {
+ String str = mapper.writeValueAsString(output);
+ logger.debug("user generated tx alert: " + str);
+ } catch (Exception exc) {
+ //ignore
+ }
+ return output;
+ }
+
+ public int getThreshold()
+ {
+ return threshold;
+ }
+
+ public void setThreshold(int threshold)
+ {
+ this.threshold = threshold;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(AverageAlertingOperator.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberAlertData.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberAlertData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberAlertData.java
new file mode 100644
index 0000000..28cd19a
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberAlertData.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+/**
+ * POJO to capture data related to alerts for repetitive bank id number data usage.
+ *
+ * @since 0.9.0
+ */
+public class BankIdNumberAlertData
+{
+ public String merchantId;
+ public int terminalId;
+ public int zipCode;
+ public MerchantTransaction.MerchantType merchantType;
+ public String bankIdNum;
+ public int count;
+ public boolean userGenerated;
+ public long time;
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberKey.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberKey.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberKey.java
new file mode 100644
index 0000000..eea6b14
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberKey.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.Serializable;
+
+import com.datatorrent.lib.util.TimeBucketKey;
+
+/**
+ * Bank Id Number Key
+ *
+ * @since 0.9.0
+ */
+public class BankIdNumberKey extends TimeBucketKey implements Serializable
+{
+ public String bankIdNum;
+
+ public BankIdNumberKey()
+ {
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int key = 0;
+ key |= (1 << 1);
+ key |= (bankIdNum.hashCode());
+ return super.hashCode() ^ key;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (!(obj instanceof BankIdNumberKey)) {
+ return false;
+ }
+ return super.equals(obj)
+ && bankIdNum.equals(((BankIdNumberKey)obj).bankIdNum);
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder(super.toString());
+ sb.append("|1:").append(bankIdNum);
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberSamplerOperator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberSamplerOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberSamplerOperator.java
new file mode 100644
index 0000000..0731e3c
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberSamplerOperator.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.examples.frauddetect.util.JsonUtils;
+
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Count the transactions for the underlying aggregation window if the same BIN is
+ * being used for more than defined number of transactions. Output the data as needed
+ * by Mongo output operator
+ *
+ * @since 0.9.0
+ */
+public class BankIdNumberSamplerOperator extends BaseOperator
+{
+ private final transient JsonFactory jsonFactory = new JsonFactory();
+ private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory);
+ private int threshold;
+ private Map<MerchantKey, Map<String, BankIdNumData>> bankIdNumCountMap = new HashMap<MerchantKey, Map<String, BankIdNumData>>();
+ private static final String ALERT_MSG =
+ "Potential fraudulent CC transactions (same bank id %s and merchant %s) total transactions: %d";
+ /**
+ * Output the key-value pair for the BIN as key with the count as value.
+ */
+ public final transient DefaultOutputPort<String> countAlertOutputPort =
+ new DefaultOutputPort<String>();
+ public final transient DefaultOutputPort<Map<String, Object>> countAlertNotificationPort =
+ new DefaultOutputPort<Map<String, Object>>();
+
+ public int getThreshold()
+ {
+ return threshold;
+ }
+
+ public void setThreshold(int threshold)
+ {
+ this.threshold = threshold;
+ }
+
+ /*
+ public final transient DefaultInputPort<KeyValPair<MerchantKey, String>> txInputPort =
+ new DefaultInputPort<KeyValPair<MerchantKey, String>>()
+ {
+ @Override
+ public void process(KeyValPair<MerchantKey, String> tuple)
+ {
+ processTuple(tuple);
+ }
+
+ };
+
+ private void processTuple(KeyValPair<MerchantKey, String> tuple)
+ {
+ MerchantKey key = tuple.getKey();
+ Map<String, BankIdNumData> map = bankIdNumCountMap.get(key);
+ if (map == null) {
+ map = new HashMap<String, BankIdNumData>();
+ bankIdNumCountMap.put(key, map);
+ }
+ String bankIdNum = tuple.getValue();
+ BankIdNumData bankIdNumData = map.get(bankIdNum);
+ if (bankIdNumData == null) {
+ bankIdNumData = new BankIdNumData();
+ bankIdNumData.bankIdNum = bankIdNum;
+ map.put(bankIdNum, bankIdNumData);
+ }
+ bankIdNumData.count.increment();
+ if (key.userGenerated) {
+ bankIdNumData.userGenerated = true;
+ }
+ }
+ */
+
+ public final transient DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> txCountInputPort =
+ new DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>>()
+ {
+ @Override
+ public void process(KeyValPair<KeyValPair<MerchantKey, String>, Integer> tuple)
+ {
+ processTuple(tuple.getKey(), tuple.getValue());
+ }
+
+ };
+
+ private void processTuple(KeyValPair<MerchantKey, String> tuple, Integer count)
+ {
+ MerchantKey key = tuple.getKey();
+ Map<String, BankIdNumData> map = bankIdNumCountMap.get(key);
+ if (map == null) {
+ map = new HashMap<String, BankIdNumData>();
+ bankIdNumCountMap.put(key, map);
+ }
+ String bankIdNum = tuple.getValue();
+ BankIdNumData bankIdNumData = map.get(bankIdNum);
+ if (bankIdNumData == null) {
+ bankIdNumData = new BankIdNumData();
+ bankIdNumData.bankIdNum = bankIdNum;
+ map.put(bankIdNum, bankIdNumData);
+ }
+ bankIdNumData.count.setValue(count);
+ if (key.userGenerated) {
+ bankIdNumData.userGenerated = true;
+ }
+ }
+
+ /**
+ * Go through the BIN Counter map and check if any of the values for the BIN exceed the threshold.
+ * If yes, generate the alert on the output port.
+ */
+ @Override
+ public void endWindow()
+ {
+ for (Map.Entry<MerchantKey, Map<String, BankIdNumData>> entry : bankIdNumCountMap.entrySet()) {
+ List<BankIdNumData> list = null;
+ MerchantKey key = entry.getKey();
+ if (key.merchantType == MerchantTransaction.MerchantType.INTERNET) {
+ continue;
+ }
+ list = dataOutput(entry.getValue());
+ if (list.size() > 0) {
+ for (BankIdNumData binData : list) {
+ BankIdNumberAlertData data = new BankIdNumberAlertData();
+ data.merchantId = key.merchantId;
+ data.terminalId = key.terminalId == null ? 0 : key.terminalId;
+ data.zipCode = key.zipCode;
+ data.merchantType = key.merchantType;
+ data.bankIdNum = binData.bankIdNum;
+ data.count = binData.count.intValue();
+ data.userGenerated = binData.userGenerated;
+ data.time = System.currentTimeMillis();
+ try {
+ countAlertOutputPort.emit(JsonUtils.toJson(data));
+ countAlertNotificationPort.emit(getOutputData(data));
+ } catch (IOException e) {
+ logger.warn("Exception while converting object to JSON: ", e);
+ }
+ }
+ }
+ }
+ bankIdNumCountMap.clear();
+ }
+
+ private List<BankIdNumData> dataOutput(Map<String, BankIdNumData> map)
+ {
+ List<BankIdNumData> list = new ArrayList<BankIdNumData>();
+ int count = 0;
+ for (Map.Entry<String, BankIdNumData> bankIdEntry : map.entrySet()) {
+ BankIdNumData data = bankIdEntry.getValue();
+ if (data.count.intValue() > threshold) {
+ list.add(data);
+ }
+ }
+ return list;
+ }
+
+ private Map<String, Object> getOutputData(BankIdNumberAlertData data)
+ {
+ Map<String, Object> output = new HashMap<String, Object>();
+ output.put("message", String.format(ALERT_MSG, data.bankIdNum, data.merchantId, data.count));
+ output.put("alertType", "sameBankId");
+ output.put("userGenerated", "" + data.userGenerated);
+ output.put("alertData", data);
+
+ try {
+ String str = mapper.writeValueAsString(output);
+ logger.debug("user generated tx alert: " + str);
+ } catch (Exception exc) {
+ //ignore
+ }
+
+ return output;
+ }
+
+ public static final class BankIdNumData
+ {
+ public String bankIdNum;
+ public MutableLong count = new MutableLong();
+ public boolean userGenerated = false;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(BankIdNumberSamplerOperator.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAlertData.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAlertData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAlertData.java
new file mode 100644
index 0000000..f46f84a
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAlertData.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+/**
+ * POJO to capture data related to alerts for credit card number.
+ *
+ * @since 0.9.0
+ */
+public class CreditCardAlertData
+{
+ public String merchantId;
+ public int terminalId;
+ public int zipCode;
+ public MerchantTransaction.MerchantType merchantType;
+ public String fullCcNum;
+ public long small;
+ public long large;
+ public double threshold;
+ public boolean userGenerated;
+ public long time;
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAmountSamplerOperator.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAmountSamplerOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAmountSamplerOperator.java
new file mode 100644
index 0000000..235e36e
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAmountSamplerOperator.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.examples.frauddetect.util.JsonUtils;
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * An operator to alert in case a transaction of a small lowAmount is followed by a transaction which is significantly larger for a given credit card number.
+ * This is done for each transaction. This also means that this happens for each individual credit card.
+ * It accepts merchant transaction object and for each CC listed in the transaction(s), checks for the transaction amounts. An alert is raised if the transaction
+ * lowAmount is significantly > the lowest amt in this window.
+ *
+ * @since 0.9.0
+ */
+public class CreditCardAmountSamplerOperator extends BaseOperator
+{
+ private final transient JsonFactory jsonFactory = new JsonFactory();
+ private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory);
+ private static final Logger logger = LoggerFactory.getLogger(Application.class);
+ // Factor to be applied to existing lowAmount to flag potential alerts.
+ private double threshold = 9500;
+ private Map<String, CreditCardInfo> ccTxnMap = new HashMap<String, CreditCardInfo>();
+ //private Map<String, MutableLong> ccQueryTxnMap = new HashMap<String, MutableLong>();
+ private List<CreditCardAlertData> alerts = new ArrayList<CreditCardAlertData>();
+ //private List<CreditCardAlertData> userAlerts = new ArrayList<CreditCardAlertData>();
+ private static final String ALERT_MSG =
+ "Potential fraudulent CC transactions (small one USD %d followed by large USD %d) performed using credit card: %s";
+ public final transient DefaultOutputPort<String> ccAlertOutputPort = new DefaultOutputPort<String>();
+ /*
+ public final transient DefaultOutputPort<Map<String, Object>> ccUserAlertOutputPort = new DefaultOutputPort<Map<String, Object>>();
+ */
+ public final transient DefaultOutputPort<Map<String, Object>> ccAlertNotificationPort = new DefaultOutputPort<Map<String, Object>>();
+
+ public double getThreshold()
+ {
+ return threshold;
+ }
+
+ public void setThreshold(double threshold)
+ {
+ this.threshold = threshold;
+ }
+
+ private void processTuple(KeyValPair<MerchantKey, CreditCardData> tuple, Map<String, CreditCardInfo> txMap)
+ {
+ String fullCcNum = tuple.getValue().fullCcNum;
+ long ccAmount = tuple.getValue().amount;
+ MerchantKey key = tuple.getKey();
+
+ CreditCardInfo cardInfo = txMap.get(fullCcNum);
+
+ if (cardInfo != null) {
+ long currentSmallValue = cardInfo.lowAmount.longValue();
+ if (ccAmount < currentSmallValue) {
+ cardInfo.lowAmount.setValue(ccAmount);
+ cardInfo.time = key.time;
+ } else if (ccAmount > (currentSmallValue + threshold)) {
+ // If the transaction lowAmount is > 70% of the min. lowAmount, send an alert.
+
+ CreditCardAlertData data = new CreditCardAlertData();
+
+ data.merchantId = key.merchantId;
+ data.terminalId = key.terminalId == null ? 0 : key.terminalId;
+ data.zipCode = key.zipCode;
+ data.merchantType = key.merchantType;
+ data.fullCcNum = fullCcNum;
+ data.small = currentSmallValue;
+ data.large = ccAmount;
+ data.threshold = threshold;
+ data.userGenerated = key.userGenerated;
+ data.time = System.currentTimeMillis();
+
+ alerts.add(data);
+
+ /*
+ if (userGenerated){
+ userAlerts.add(data);
+ }
+ */
+ ccAlertNotificationPort.emit(getOutputData(data));
+
+ // Any high value transaction after a low value transaction with difference greater than threshold
+ // will trigger the alert. Not resetting the low value also helps in a system generated transaction
+ // alert not resetting the low value from a user generated transaction
+ //txMap.remove(fullCcNum);
+ }
+ } else {
+ cardInfo = new CreditCardInfo();
+ cardInfo.lowAmount.setValue(ccAmount);
+ cardInfo.time = key.time;
+ txMap.put(fullCcNum, cardInfo);
+ }
+ }
+
+ public transient DefaultInputPort<KeyValPair<MerchantKey, CreditCardData>> inputPort =
+ new DefaultInputPort<KeyValPair<MerchantKey, CreditCardData>>()
+ {
+ //
+ // This function checks if a CC entry exists.
+ // If so, it checks whether the current transaction is for an lowAmount lesser than the one stored in the hashmap. If so, this becomes the min. transaction lowAmount.
+ // If the lowAmount is > 70% of the existing lowAmount in the hash map, raise an alert.
+ //
+ @Override
+ public void process(KeyValPair<MerchantKey, CreditCardData> tuple)
+ {
+
+ processTuple(tuple, ccTxnMap);
+
+ }
+
+ };
+
+ @Override
+ public void endWindow()
+ {
+
+ for (CreditCardAlertData data : alerts) {
+ try {
+ ccAlertOutputPort.emit(JsonUtils.toJson(data));
+ } catch (IOException e) {
+ logger.warn("Exception while converting object to JSON", e);
+ }
+ }
+
+ //for (CreditCardAlertData data: userAlerts) {
+ /*for (CreditCardAlertData data: alerts) {
+ ccAlertNotificationPort.emit(getOutputData(data));
+ }*/
+
+ long ctime = System.currentTimeMillis();
+ Iterator<Map.Entry<String, CreditCardInfo>> iterator = ccTxnMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, CreditCardInfo> entry = iterator.next();
+ long time = entry.getValue().time;
+ if ((ctime - time) > 60000) {
+ iterator.remove();
+ }
+ }
+
+ //ccTxnMap.clear();
+ alerts.clear();
+
+ //ccQueryTxnMap.clear();
+ //userAlerts.clear();
+ }
+
+ private static class CreditCardInfo
+ {
+ MutableLong lowAmount = new MutableLong();
+ Long time;
+ }
+
+ private Map<String, Object> getOutputData(CreditCardAlertData data)
+ {
+ Map<String, Object> output = new HashMap<String, Object>();
+ output.put("message", String.format(ALERT_MSG, data.small, data.large, data.fullCcNum));
+ output.put("alertType", "smallThenLarge");
+ output.put("userGenerated", "" + data.userGenerated);
+ output.put("alertData", data);
+
+ try {
+ String str = mapper.writeValueAsString(output);
+ logger.debug("Alert generated: " + str + " userGenerated: " + data.userGenerated);
+ } catch (Exception exc) {
+ //ignore
+ }
+
+ return output;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardData.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardData.java
new file mode 100644
index 0000000..7c667d6
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardData.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+/**
+ * Credit Card Data
+ *
+ * @since 0.9.0
+ */
+public class CreditCardData
+{
+ public String fullCcNum;
+ public long amount;
+
+ public CreditCardData()
+ {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantKey.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantKey.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantKey.java
new file mode 100644
index 0000000..d73c693
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantKey.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+
+import java.io.Serializable;
+
+/**
+ * A time-based key for merchant data.
+ *
+ * @since 0.9.0
+ */
+public class MerchantKey implements Serializable
+{
+ public String merchantId;
+ public Integer terminalId;
+ public Integer zipCode;
+ public String country;
+ public MerchantTransaction.MerchantType merchantType;
+ public Long time;
+ public boolean userGenerated;
+
+ public MerchantKey()
+ {
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int key = 0;
+ if (merchantId != null) {
+ key |= (1 << 1);
+ key |= (merchantId.hashCode());
+ }
+ if (terminalId != null) {
+ key |= (1 << 2);
+ key |= (terminalId << 2);
+ }
+ if (zipCode != null) {
+ key |= (1 << 3);
+ key |= (zipCode << 3);
+ }
+ if (country != null) {
+ key |= (1 << 4);
+ key |= (country.hashCode());
+ }
+ if (merchantType != null) {
+ key |= (1 << 5);
+ key |= (merchantType.hashCode());
+ }
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (!(obj instanceof MerchantKey)) {
+ return false;
+ }
+ MerchantKey mkey = (MerchantKey)obj;
+ return checkStringEqual(this.merchantId, mkey.merchantId)
+ && checkIntEqual(this.terminalId, mkey.terminalId)
+ && checkIntEqual(this.zipCode, mkey.zipCode)
+ && checkStringEqual(this.country, mkey.country)
+ && checkIntEqual(this.merchantType.ordinal(), mkey.merchantType.ordinal());
+ }
+
+ private boolean checkIntEqual(Integer a, Integer b)
+ {
+ if ((a == null) && (b == null)) {
+ return true;
+ }
+ if ((a != null) && (b != null) && a.intValue() == b.intValue()) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean checkStringEqual(String a, String b)
+ {
+ if ((a == null) && (b == null)) {
+ return true;
+ }
+ if ((a != null) && a.equals(b)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ if (merchantId != null) {
+ sb.append("|1:").append(merchantId);
+ }
+ if (terminalId != null) {
+ sb.append("|2:").append(terminalId);
+ }
+ if (zipCode != null) {
+ sb.append("|3:").append(zipCode);
+ }
+ if (country != null) {
+ sb.append("|4:").append(country);
+ }
+ if (merchantType != null) {
+ sb.append("|5:").append(merchantType);
+ }
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantQueryInputHandler.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantQueryInputHandler.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantQueryInputHandler.java
new file mode 100644
index 0000000..e6a4680
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantQueryInputHandler.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.util.Map;
+
+/**
+ * Common utility class that can be used by all other operators to handle user input
+ * captured from the Web socket input port.
+ *
+ * @since 0.9.0
+ */
+public class MerchantQueryInputHandler
+{
+ public static final String KEY_DATA = "data";
+ public static final String KEY_MERCHANT_ID = "merchantId";
+ public static final String KEY_TERMINAL_ID = "terminalId";
+ public static final String KEY_ZIP_CODE = "zipCode";
+
+ public static MerchantKey process(Map<String, Object> tuple)
+ {
+ String merchantId = null;
+ Integer terminalId = null;
+ Integer zipCode = null;
+
+ // ignoring other top-level attributes.
+ Map<String, Object> data = (Map<String, Object>)tuple.get(KEY_DATA);
+ if (data.get(KEY_MERCHANT_ID) != null) {
+ merchantId = (String)data.get(KEY_MERCHANT_ID);
+ }
+ if (data.get(KEY_TERMINAL_ID) != null) {
+ terminalId = (Integer)data.get(KEY_TERMINAL_ID);
+ }
+ if (data.get(KEY_ZIP_CODE) != null) {
+ zipCode = (Integer)data.get(KEY_ZIP_CODE);
+ }
+
+ MerchantKey key = new MerchantKey();
+ key.merchantId = merchantId;
+ key.terminalId = terminalId;
+ key.zipCode = zipCode;
+ key.country = "USA";
+ if (merchantId != null) {
+ key.merchantType = key.merchantId.equalsIgnoreCase(MerchantTransactionGenerator.merchantIds[2])
+ || key.merchantId.equalsIgnoreCase(MerchantTransactionGenerator.merchantIds[3])
+ ? MerchantTransaction.MerchantType.INTERNET
+ : MerchantTransaction.MerchantType.BRICK_AND_MORTAR;
+ }
+ return key;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransaction.java
----------------------------------------------------------------------
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransaction.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransaction.java
new file mode 100644
index 0000000..a722492
--- /dev/null
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransaction.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.frauddetect;
+
+import java.io.Serializable;
+
+/**
+ * POJO for BIN Alert related data.
+ *
+ * @since 0.9.0
+ */
+public class MerchantTransaction implements Serializable
+{
+ public enum MerchantType
+ {
+ UNDEFINED, BRICK_AND_MORTAR, INTERNET
+ }
+
+ public enum TransactionType
+ {
+ UNDEFINED, POS
+ }
+
+ public String ccNum;
+ public String bankIdNum;
+ public String fullCcNum;
+ public Long amount;
+ public String merchantId;
+ public Integer terminalId;
+ public Integer zipCode;
+ public String country;
+ public MerchantType merchantType = MerchantType.UNDEFINED;
+ public TransactionType transactionType = TransactionType.UNDEFINED;
+ public Long time;
+ public boolean userGenerated;
+
+ public MerchantTransaction()
+ {
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int key = 0;
+ if (ccNum != null) {
+ key |= (1 << 1);
+ key |= (ccNum.hashCode());
+ }
+ if (bankIdNum != null) {
+ key |= (1 << 2);
+ key |= (bankIdNum.hashCode());
+ }
+ if (amount != null) {
+ key |= (1 << 6);
+ key |= (amount << 4);
+ }
+ if (merchantId != null) {
+ key |= (1 << 3);
+ key |= (merchantId.hashCode());
+ }
+ if (terminalId != null) {
+ key |= (1 << 4);
+ key |= (terminalId << 2);
+ }
+ if (zipCode != null) {
+ key |= (1 << 5);
+ key |= (zipCode << 3);
+ }
+ if (country != null) {
+ key |= (1 << 7);
+ key |= (country.hashCode());
+ }
+ if (merchantType != null) {
+ key |= (1 << 8);
+ key |= (merchantType.hashCode());
+ }
+ if (transactionType != null) {
+ key |= (1 << 9);
+ key |= (transactionType.hashCode());
+ }
+ if (fullCcNum != null) {
+ key |= (1 << 10);
+ key |= (fullCcNum.hashCode());
+ }
+ if (time != null) {
+ key |= (1 << 11);
+ key |= (time << 2);
+ }
+
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (!(obj instanceof MerchantTransaction)) {
+ return false;
+ }
+ MerchantTransaction mtx = (MerchantTransaction)obj;
+ return checkStringEqual(this.ccNum, mtx.ccNum)
+ && checkStringEqual(this.bankIdNum, mtx.bankIdNum)
+ && checkLongEqual(this.amount, mtx.amount)
+ && checkStringEqual(this.merchantId, mtx.merchantId)
+ && checkIntEqual(this.terminalId, mtx.terminalId)
+ && checkIntEqual(this.zipCode, mtx.zipCode)
+ && checkStringEqual(this.country, mtx.country)
+ && checkIntEqual(this.merchantType.ordinal(), mtx.merchantType.ordinal())
+ && checkIntEqual(this.transactionType.ordinal(), mtx.transactionType.ordinal())
+ && checkStringEqual(this.fullCcNum, mtx.fullCcNum)
+ && checkLongEqual(this.time, mtx.time);
+ }
+
+ private boolean checkIntEqual(Integer a, Integer b)
+ {
+ if ((a == null) && (b == null)) {
+ return true;
+ }
+ if ((a != null) && (b != null) && a.intValue() == b.intValue()) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean checkLongEqual(Long a, Long b)
+ {
+ if ((a == null) && (b == null)) {
+ return true;
+ }
+ if ((a != null) && (b != null) && a.longValue() == b.longValue()) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean checkStringEqual(String a, String b)
+ {
+ if ((a == null) && (b == null)) {
+ return true;
+ }
+ if ((a != null) && a.equals(b)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ if (ccNum != null) {
+ sb.append("|0:").append(ccNum);
+ }
+ if (bankIdNum != null) {
+ sb.append("|1:").append(bankIdNum);
+ }
+ if (fullCcNum != null) {
+ sb.append("|2:").append(fullCcNum);
+ }
+ if (amount != null) {
+ sb.append("|3:").append(amount);
+ }
+ if (merchantId != null) {
+ sb.append("|4:").append(merchantId);
+ }
+ if (terminalId != null) {
+ sb.append("|5:").append(terminalId);
+ }
+ if (zipCode != null) {
+ sb.append("|6:").append(zipCode);
+ }
+ if (country != null) {
+ sb.append("|7:").append(country);
+ }
+ if (merchantType != null) {
+ sb.append("|8:").append(merchantType);
+ }
+ if (transactionType != null) {
+ sb.append("|9:").append(transactionType);
+ }
+ if (time != null) {
+ sb.append("|10:").append(time);
+ }
+ return sb.toString();
+ }
+
+}