You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:13 UTC

[08/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/Application.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/Application.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/Application.java
new file mode 100644
index 0000000..8132c7d
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/Application.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.examples.machinedata.data.MachineKey;
+import org.apache.apex.examples.machinedata.operator.MachineInfoAveragingOperator;
+import org.apache.apex.examples.machinedata.operator.MachineInfoAveragingPrerequisitesOperator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
+import com.datatorrent.lib.io.SmtpOutputOperator;
+
+/**
+ * <p>
+ * Resource monitor application.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+@ApplicationAnnotation(name = "MachineDataExample")
+@SuppressWarnings("unused")
+public class Application implements StreamingApplication
+{
+
+  private static final Logger LOG = LoggerFactory.getLogger(Application.class);
+
+  /**
+   * This function sets up the DAG for calculating the average
+   *
+   * @param dag  the DAG instance
+   * @param conf the configuration instance
+   * @return MachineInfoAveragingPrerequisitesOperator
+   */
+  private MachineInfoAveragingPrerequisitesOperator addAverageCalculation(DAG dag, Configuration conf)
+  {
+    MachineInfoAveragingPrerequisitesOperator prereqAverageOper = dag.addOperator("Aggregator", MachineInfoAveragingPrerequisitesOperator.class);
+    MachineInfoAveragingOperator averageOperator = dag.addOperator("AverageCalculator", MachineInfoAveragingOperator.class);
+    RedisKeyValPairOutputOperator<MachineKey, Map<String, String>> redisAvgOperator = dag.addOperator("Persister", new RedisKeyValPairOutputOperator<MachineKey, Map<String, String>>());
+    dag.addStream("Average", averageOperator.outputPort, redisAvgOperator.input);
+    SmtpOutputOperator smtpOutputOperator = dag.addOperator("Alerter", new SmtpOutputOperator());
+    dag.addStream("Aggregates", prereqAverageOper.outputPort, averageOperator.inputPort);
+    dag.addStream("Alerts", averageOperator.smtpAlert, smtpOutputOperator.input);
+    return prereqAverageOper;
+  }
+
+  /**
+   * Create the DAG
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    InputReceiver randomGen = dag.addOperator("Receiver", InputReceiver.class);
+    DimensionGenerator dimensionGenerator = dag.addOperator("DimensionsGenerator", DimensionGenerator.class);
+    dag.addStream("Events", randomGen.outputInline, dimensionGenerator.inputPort);
+    MachineInfoAveragingPrerequisitesOperator prereqAverageOper = addAverageCalculation(dag, conf);
+    dag.addStream("DimensionalData", dimensionGenerator.outputInline, prereqAverageOper.inputPort);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/DimensionGenerator.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/DimensionGenerator.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/DimensionGenerator.java
new file mode 100644
index 0000000..6c717c2
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/DimensionGenerator.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata;
+
+import org.apache.apex.examples.machinedata.data.MachineInfo;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+
+/**
+ * <p>
+ * Information tuple generator with randomness.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+@SuppressWarnings("unused")
+public class DimensionGenerator extends BaseOperator
+{
+  public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>();
+  public transient DefaultOutputPort<MachineInfo> output = new DefaultOutputPort<>();
+  private int threshold = 90;
+
+  public final transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>()
+  {
+
+    @Override
+    public void process(MachineInfo tuple)
+    {
+      emitDimensions(tuple);
+    }
+
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+  }
+
+  /**
+   * This returns the threshold value set
+   * @return
+   */
+  public int getThreshold()
+  {
+    return threshold;
+  }
+
+  /**
+   * This function sets the threshold value. This value is used to check the maximum value for cpu/ram/hdd
+   * @param threshold
+   */
+  public void setThreshold(int threshold)
+  {
+    this.threshold = threshold;
+  }
+
+  /**
+   * This function takes in the tuple from upstream operator and generates tuples with different dimension combinations
+   *
+   * @param tuple
+   */
+  private void emitDimensions(MachineInfo tuple)
+  {
+    MachineKey tupleKey = tuple.getMachineKey();
+
+    for (int i = 0; i < 64; i++) {
+      MachineKey machineKey = new MachineKey(tupleKey.getTimeKey(),tupleKey.getDay());
+      if ((i & 1) != 0) {
+        machineKey.setCustomer(tupleKey.getCustomer());
+      }
+      if ((i & 2) != 0) {
+        machineKey.setProduct(tupleKey.getProduct());
+      }
+      if ((i & 4) != 0) {
+        machineKey.setOs(tupleKey.getOs());
+      }
+      if ((i & 8) != 0) {
+        machineKey.setDeviceId(tupleKey.getDeviceId());
+      }
+      if ((i & 16) != 0) {
+        machineKey.setSoftware1(tupleKey.getSoftware1());
+      }
+      if ((i & 32) != 0) {
+        machineKey.setSoftware2(tupleKey.getSoftware2());
+      }
+
+      int cpu = tuple.getCpu();
+      int ram = tuple.getRam();
+      int hdd = tuple.getHdd();
+      MachineInfo machineInfo = new MachineInfo();
+      machineInfo.setMachineKey(machineKey);
+      machineInfo.setCpu((cpu < threshold) ? cpu : threshold);
+      machineInfo.setRam((ram < threshold) ? ram : threshold);
+      machineInfo.setHdd((hdd < threshold) ? hdd : threshold);
+      outputInline.emit(machineInfo);
+      output.emit(machineInfo);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/InputReceiver.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/InputReceiver.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/InputReceiver.java
new file mode 100644
index 0000000..263db55
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/InputReceiver.java
@@ -0,0 +1,524 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Random;
+import java.util.TimeZone;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.examples.machinedata.data.MachineInfo;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * <p>
+ * Information tuple generator with randomness.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+@SuppressWarnings("unused")
+public class InputReceiver extends BaseOperator implements InputOperator
+{
+  private static final Logger logger = LoggerFactory.getLogger(InputReceiver.class);
+
+  public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>();
+  private final Random randomGen = new Random();
+
+  private int customerMin = 1;
+  private int customerMax = 5;
+  private int productMin = 4;
+  private int productMax = 6;
+  private int osMin = 10;
+  private int osMax = 12;
+  private int software1Min = 10;
+  private int software1Max = 12;
+  private int software2Min = 12;
+  private int software2Max = 14;
+  private int software3Min = 4;
+  private int software3Max = 6;
+
+  private int deviceIdMin = 1;
+  private int deviceIdMax = 50;
+
+  private int tupleBlastSize = 1001;
+
+  private static final DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
+  private static final DateFormat dayDateFormat = new SimpleDateFormat("d");
+
+  static {
+    TimeZone tz = TimeZone.getTimeZone("GMT");
+    minuteDateFormat.setTimeZone(tz);
+    dayDateFormat.setTimeZone(tz);
+
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    int count = 0;
+    Calendar calendar = Calendar.getInstance();
+    Date date = calendar.getTime();
+    String timeKey = minuteDateFormat.format(date);
+    String day = dayDateFormat.format(date);
+
+    while (count < tupleBlastSize) {
+      randomGen.setSeed(System.currentTimeMillis());
+
+      int customerVal = genCustomerId();
+      int productVal = genProductVer();
+      int osVal = genOsVer();
+      int software1Val = genSoftware1Ver();
+      int software2Val = genSoftware2Ver();
+      int software3Val = genSoftware3Ver();
+      int deviceIdVal = genDeviceId();
+
+      int cpuVal = genCpu(calendar);
+      int ramVal = genRam(calendar);
+      int hddVal = genHdd(calendar);
+
+      MachineKey machineKey = new MachineKey(timeKey, day);
+
+      machineKey.setCustomer(customerVal);
+      machineKey.setProduct(productVal);
+      machineKey.setOs(osVal);
+      machineKey.setDeviceId(deviceIdVal);
+      machineKey.setSoftware1(software1Val);
+      machineKey.setSoftware2(software2Val);
+      machineKey.setSoftware3(software3Val);
+      MachineInfo machineInfo = new MachineInfo();
+      machineInfo.setMachineKey(machineKey);
+      machineInfo.setCpu(cpuVal);
+      machineInfo.setRam(ramVal);
+      machineInfo.setHdd(hddVal);
+
+      outputInline.emit(machineInfo);
+
+      count++;
+    }
+  }
+
+  private int genCustomerId()
+  {
+    int range = customerMax - customerMin + 1;
+    return customerMin + randomGen.nextInt(range);
+  }
+
+  private int genProductVer()
+  {
+    int range = productMax - productMin + 1;
+    return productMin + randomGen.nextInt(range);
+  }
+
+  private int genOsVer()
+  {
+    int range = osMax - osMin + 1;
+    return osMin + randomGen.nextInt(range);
+  }
+
+  private int genSoftware3Ver()
+  {
+    int range = software3Max - software3Min + 1;
+    return software3Min + randomGen.nextInt(range);
+  }
+
+  private int genDeviceId()
+  {
+    int range = deviceIdMax - deviceIdMin + 1;
+    return deviceIdMin + randomGen.nextInt(range);
+  }
+
+  private int genSoftware1Ver()
+  {
+    int range = software1Max - software1Min + 1;
+    return software1Min + randomGen.nextInt(range);
+  }
+
+  private int genSoftware2Ver()
+  {
+    int range = software2Max - software2Min + 1;
+    return software2Min + randomGen.nextInt(range);
+  }
+
+  private int genCpu(Calendar cal)
+  {
+    int minute = cal.get(Calendar.MINUTE);
+    int second;
+    int range = minute / 2 + 19;
+    if (minute / 17 == 0) {
+      second = cal.get(Calendar.SECOND);
+      return (30 + randomGen.nextInt(range) + (minute % 7) - (second % 11));
+    } else if (minute / 47 == 0) {
+      second = cal.get(Calendar.SECOND);
+      return (7 + randomGen.nextInt(range) + (minute % 7) - (second % 7));
+    } else {
+      second = cal.get(Calendar.SECOND);
+      return (randomGen.nextInt(range) + (minute % 19) + (second % 7));
+    }
+  }
+
+  private int genRam(Calendar cal)
+  {
+    int minute = cal.get(Calendar.MINUTE);
+    int second;
+    int range = minute + 1;
+    if (minute / 23 == 0) {
+      second = cal.get(Calendar.SECOND);
+      return (20 + randomGen.nextInt(range) + (minute % 5) - (second % 11));
+    } else if (minute / 37 == 0) {
+      second = cal.get(Calendar.SECOND);
+      return (11 + randomGen.nextInt(60) - (minute % 5) - (second % 11));
+    } else {
+      second = cal.get(Calendar.SECOND);
+      return (randomGen.nextInt(range) + (minute % 17) + (second % 11));
+    }
+  }
+
+  private int genHdd(Calendar cal)
+  {
+    int minute = cal.get(Calendar.MINUTE);
+    int second;
+    int range = minute / 2 + 1;
+    if (minute / 37 == 0) {
+      second = cal.get(Calendar.SECOND);
+      return (25 + randomGen.nextInt(range) - minute % 7 - second % 11);
+    } else {
+      second = cal.get(Calendar.SECOND);
+      return (randomGen.nextInt(range) + minute % 23 + second % 11);
+    }
+  }
+
+  /**
+   * This method returns the minimum value for customer
+   *
+   * @return
+   */
+  public int getCustomerMin()
+  {
+    return customerMin;
+  }
+
+  /**
+   * This method is used to set the minimum value for customer
+   *
+   * @param customerMin the minimum customer value
+   */
+  public void setCustomerMin(int customerMin)
+  {
+    this.customerMin = customerMin;
+  }
+
+  /**
+   * This method returns the max value for customer
+   *
+   * @return
+   */
+  public int getCustomerMax()
+  {
+    return customerMax;
+  }
+
+  /**
+   * This method is used to set the max value for customer
+   *
+   * @param customerMax the max customer value
+   */
+  public void setCustomerMax(int customerMax)
+  {
+    this.customerMax = customerMax;
+  }
+
+  /**
+   * This method returns the minimum value for product
+   *
+   * @return
+   */
+  public int getProductMin()
+  {
+    return productMin;
+  }
+
+  /**
+   * This method is used to set the minimum value for product
+   *
+   * @param productMin the minimum product value
+   */
+  public void setProductMin(int productMin)
+  {
+    this.productMin = productMin;
+  }
+
+  /**
+   * This method returns the max value for product
+   *
+   * @return
+   */
+  public int getProductMax()
+  {
+    return productMax;
+  }
+
+  /**
+   * This method is used to set the max value for product
+   *
+   * @param productMax the max product value
+   */
+  public void setProductMax(int productMax)
+  {
+    this.productMax = productMax;
+  }
+
+  /**
+   * This method returns the minimum value for OS
+   *
+   * @return
+   */
+  public int getOsMin()
+  {
+    return osMin;
+  }
+
+  /**
+   * This method is used to set the minimum value for OS
+   *
+   * @param osMin the min OS value
+   */
+  public void setOsMin(int osMin)
+  {
+    this.osMin = osMin;
+  }
+
+  /**
+   * This method returns the max value for OS
+   *
+   * @return
+   */
+  public int getOsMax()
+  {
+    return osMax;
+  }
+
+  /**
+   * This method is used to set the max value for OS
+   *
+   * @param osMax the max OS value
+   */
+  public void setOsMax(int osMax)
+  {
+    this.osMax = osMax;
+  }
+
+  /**
+   * This method returns the minimum value for software1
+   *
+   * @return
+   */
+  public int getSoftware1Min()
+  {
+    return software1Min;
+  }
+
+  /**
+   * This method is used to set the minimum value for software1
+   *
+   * @param software1Min the minimum software1 value
+   */
+  public void setSoftware1Min(int software1Min)
+  {
+    this.software1Min = software1Min;
+  }
+
+  /**
+   * This method returns the max value for software1
+   *
+   * @return
+   */
+  public int getSoftware1Max()
+  {
+    return software1Max;
+  }
+
+  /**
+   * This method is used to set the max value for software1
+   *
+   * @param software1Max the max software1 value
+   */
+  public void setSoftware1Max(int software1Max)
+  {
+    this.software1Max = software1Max;
+  }
+
+  /**
+   * This method returns the minimum value for software2
+   *
+   * @return
+   */
+  public int getSoftware2Min()
+  {
+    return software2Min;
+  }
+
+  /**
+   * This method is used to set the minimum value for software2
+   *
+   * @param software2Min the minimum software2 value
+   */
+  public void setSoftware2Min(int software2Min)
+  {
+    this.software2Min = software2Min;
+  }
+
+  /**
+   * This method returns the max value for software2
+   *
+   * @return
+   */
+  public int getSoftware2Max()
+  {
+    return software2Max;
+  }
+
+  /**
+   * This method is used to set the max value for software2
+   *
+   * @param software2Max the max software2 value
+   */
+  public void setSoftware2Max(int software2Max)
+  {
+    this.software2Max = software2Max;
+  }
+
+  /**
+   * This method returns the minimum value for software3
+   *
+   * @return
+   */
+  public int getSoftware3Min()
+  {
+    return software3Min;
+  }
+
+  /**
+   * This method is used to set the minimum value for software3
+   *
+   * @param software3Min the minimum software3 value
+   */
+  public void setSoftware3Min(int software3Min)
+  {
+    this.software3Min = software3Min;
+  }
+
+  /**
+   * This method returns the max value for software3
+   *
+   * @return
+   */
+  public int getSoftware3Max()
+  {
+    return software3Max;
+  }
+
+  /**
+   * This method is used to set the max value for software3
+   *
+   * @param software3Max the max software3 value
+   */
+  public void setSoftware3Max(int software3Max)
+  {
+    this.software3Max = software3Max;
+  }
+
+  /**
+   * This method returns the minimum value for deviceId
+   *
+   * @return
+   */
+  public int getDeviceIdMin()
+  {
+    return deviceIdMin;
+  }
+
+  /**
+   * This method is used to set the minimum value for deviceId
+   *
+   * @param deviceIdMin the minimum deviceId value
+   */
+  public void setDeviceIdMin(int deviceIdMin)
+  {
+    this.deviceIdMin = deviceIdMin;
+  }
+
+  /**
+   * This method returns the max value for deviceId
+   *
+   * @return
+   */
+  public int getDeviceIdMax()
+  {
+    return deviceIdMax;
+  }
+
+  /**
+   * This method is used to set the max value for deviceId
+   *
+   * @param deviceIdMax the max deviceId value
+   */
+  public void setDeviceIdMax(int deviceIdMax)
+  {
+    this.deviceIdMax = deviceIdMax;
+  }
+
+  /**
+   * @return the tupleBlastSize
+   */
+  public int getTupleBlastSize()
+  {
+    return tupleBlastSize;
+  }
+
+  /**
+   * @param tupleBlastSize the tupleBlastSize to set
+   */
+  public void setTupleBlastSize(int tupleBlastSize)
+  {
+    this.tupleBlastSize = tupleBlastSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/AverageData.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/AverageData.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/AverageData.java
new file mode 100644
index 0000000..3c12335
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/AverageData.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.data;
+
+
+/**
+ * This class stores the value of sum and the count of values summed.
+ * <p>
+ * AverageData class.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+public class AverageData
+{
+
+  private long cpu;
+  private long hdd;
+  private long ram;
+  private long count;
+
+  /**
+   * This is default constructor that sets the sum and count to 0
+   */
+  public AverageData()
+  {
+
+  }
+
+  /**
+   * This constructor takes the value of sum and count and initialize the local attributes to corresponding values
+   *
+   * @param count
+   *          the value of count
+   */
+  public AverageData(long cpu,long hdd,long ram, long count)
+  {
+    this.cpu = cpu;
+    this.ram = ram;
+    this.hdd = hdd;
+    this.count = count;
+  }
+
+  public long getCpu()
+  {
+    return cpu;
+  }
+
+  public void setCpu(long cpu)
+  {
+    this.cpu = cpu;
+  }
+
+  public long getHdd()
+  {
+    return hdd;
+  }
+
+  public void setHdd(long hdd)
+  {
+    this.hdd = hdd;
+  }
+
+  public long getRam()
+  {
+    return ram;
+  }
+
+  public void setRam(long ram)
+  {
+    this.ram = ram;
+  }
+
+  /**
+   * This returns the value of count
+   * @return
+   */
+  public long getCount()
+  {
+    return count;
+  }
+
+  /**
+   * This method sets the value of count
+   * @param count
+   */
+  public void setCount(long count)
+  {
+    this.count = count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineInfo.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineInfo.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineInfo.java
new file mode 100644
index 0000000..3952b70
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineInfo.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.data;
+
+/**
+ * This class stores the cpu% usage, ram% usage, hdd% usage and key information about a particular machine
+ * <p>
+ * MachineInfo class.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+public class MachineInfo
+{
+  private MachineKey machineKey;
+  private int cpu;
+  private int ram;
+  private int hdd;
+
+  /**
+   * This default constructor
+   */
+  public MachineInfo()
+  {
+  }
+
+  /**
+   * This constructor takes MachineKey as input and initialize local attributes
+   *
+   * @param machineKey
+   *          the MachineKey instance
+   */
+  public MachineInfo(MachineKey machineKey)
+  {
+    this.machineKey = machineKey;
+  }
+
+  /**
+   * This constructor takes MachineKey, cpu usage, ram usage, hdd usage as input and initialize local attributes
+   *
+   * @param machineKey
+   *          the MachineKey instance
+   * @param cpu
+   *          the CPU% usage
+   * @param ram
+   *          the RAM% usage
+   * @param hdd
+   *          the HDD% usage
+   */
+  public MachineInfo(MachineKey machineKey, int cpu, int ram, int hdd)
+  {
+    this.machineKey = machineKey;
+    this.cpu = cpu;
+    this.ram = ram;
+    this.hdd = hdd;
+  }
+
+  /**
+   * This method returns the MachineKey
+   *
+   * @return
+   */
+  public MachineKey getMachineKey()
+  {
+    return machineKey;
+  }
+
+  /**
+   * This method sets the MachineKey
+   *
+   * @param machineKey
+   *          the MachineKey instance
+   */
+  public void setMachineKey(MachineKey machineKey)
+  {
+    this.machineKey = machineKey;
+  }
+
+  /**
+   * This method returns the CPU% usage
+   *
+   * @return
+   */
+  public int getCpu()
+  {
+    return cpu;
+  }
+
+  /**
+   * This method sets the CPU% usage
+   *
+   * @param cpu
+   *          the CPU% usage
+   */
+  public void setCpu(int cpu)
+  {
+    this.cpu = cpu;
+  }
+
+  /**
+   * This method returns the RAM% usage
+   *
+   * @return
+   */
+  public int getRam()
+  {
+    return ram;
+  }
+
+  /**
+   * This method sets the RAM% usage
+   *
+   * @param ram
+   *          the RAM% usage
+   */
+  public void setRam(int ram)
+  {
+    this.ram = ram;
+  }
+
+  /**
+   * This method returns the HDD% usage
+   *
+   * @return
+   */
+  public int getHdd()
+  {
+    return hdd;
+  }
+
+  /**
+   * This method sets the HDD% usage
+   *
+   * @param hdd
+   *          the HDD% usage
+   */
+  public void setHdd(int hdd)
+  {
+    this.hdd = hdd;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineKey.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineKey.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineKey.java
new file mode 100644
index 0000000..2bf0a53
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineKey.java
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.data;
+
+/**
+ * This class stores the information about various softwares, deviceIds, OS of the device
+ * <p>
+ * MachineKey class.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+public class MachineKey
+{
+
+  private Integer customer;
+  private Integer product;
+  private Integer os;
+  private Integer software1;
+  private Integer software2;
+  private Integer software3;
+  private Integer deviceId;
+  private String timeKey;
+  private String day;
+
+  /**
+   * This constructor takes the format in which time has to be captured and the day when this instance is created
+   *
+   * @param timeKey the format in which time has to be captured
+   * @param day the day when this instance is created
+   */
+  public MachineKey(String timeKey, String day)
+  {
+    this.timeKey = timeKey;
+    this.day = day;
+  }
+
+  /**
+   * This is default constructor
+   */
+  public MachineKey()
+  {
+  }
+
+  /**
+   * This constructor takes format in which time has to be captured, the day when this instance is created, the customer
+   * id, product Id on the device, OS version on the device, software1 version on the device, software2 version on the device,
+   * software3 version on the device, deviceId on the device,
+   *
+   * @param timeKey the format in which time has to be captured
+   * @param day the day when this instance is created
+   * @param customer the customer Id
+   * @param product product Id
+   * @param os OS version
+   * @param software1 software1 version
+   * @param software2 software2 version
+   * @param software3 software3 version
+   * @param deviceId deviceId
+   */
+  public MachineKey(String timeKey, String day, Integer customer, Integer product, Integer os, Integer software1, Integer software2, Integer software3, Integer deviceId)
+  {
+    this.timeKey = timeKey;
+    this.day = day;
+    this.customer = customer;
+    this.product = product;
+    this.os = os;
+    this.software1 = software1;
+    this.software2 = software2;
+    this.software3 = software3;
+    this.deviceId = deviceId;
+  }
+
+  /**
+   * This method returns the format in which the time is captured. The time is the time when this instance of MachineKey
+   * was generated. For e.g. HHmm to capture Hour and minute
+   *
+   * @return
+   */
+  public String getTimeKey()
+  {
+    return timeKey;
+  }
+
+  /**
+   * This method sets the format in which the time is captured. The time is the time when this instance of MachineKey
+   * was generated. For e.g. HHmm to capture Hour and minute
+   *
+   * @param timeKey
+   *          the value of format
+   */
+  public void setTimeKey(String timeKey)
+  {
+    this.timeKey = timeKey;
+  }
+
+  /**
+   * This method returns the day of the month when this instance of MachineKey was generated
+   *
+   * @return
+   */
+  public String getDay()
+  {
+    return day;
+  }
+
+  /**
+   * This method sets the day of the month when this instance of MachineKey was generated
+   *
+   * @param day
+   *          the day of the month
+   */
+  public void setDay(String day)
+  {
+    this.day = day;
+  }
+
+  /**
+   * This method returns the customer Id
+   *
+   * @return
+   */
+  public Integer getCustomer()
+  {
+    return customer;
+  }
+
+  /**
+   * This method sets the customer Id
+   *
+   * @param customer
+   *          the customer Id
+   */
+  public void setCustomer(Integer customer)
+  {
+    this.customer = customer;
+  }
+
+  /**
+   * This method returns product on the device
+   *
+   * @return
+   */
+  public Integer getProduct()
+  {
+    return product;
+  }
+
+  /**
+   * This method sets the product on the device
+   *
+   * @param product
+   *          the value of product
+   */
+  public void setProduct(Integer product)
+  {
+    this.product = product;
+  }
+
+  /**
+   * This method returns the OS version on the device
+   *
+   * @return
+   */
+  public Integer getOs()
+  {
+    return os;
+  }
+
+  /**
+   * This method sets the OS version on the device
+   *
+   * @param os
+   *          OS version
+   */
+  public void setOs(Integer os)
+  {
+    this.os = os;
+  }
+
+  /**
+   * This method returns the version of the software1 on the device
+   *
+   * @return
+   */
+  public Integer getSoftware1()
+  {
+    return software1;
+  }
+
+  /**
+   * This method sets the version of the software1 on the device
+   *
+   * @param software1 the version of the software1
+   */
+  public void setSoftware1(Integer software1)
+  {
+    this.software1 = software1;
+  }
+
+  /**
+   * This method returns the version of the software2 on the device
+   *
+   * @return
+   */
+  public Integer getSoftware2()
+  {
+    return software2;
+  }
+
+  /**
+   * This method sets the version of the software2 on the device
+   *
+   * @param software2
+   *          the version of the software2
+   */
+  public void setSoftware2(Integer software2)
+  {
+    this.software2 = software2;
+  }
+
+  /**
+   * This method returns the version of the software3 on the device
+   *
+   * @return
+   */
+  public Integer getSoftware3()
+  {
+    return software3;
+  }
+
+  /**
+   * This method sets the version of the software3 on the device
+   *
+   * @param software3
+   *          the version of the software3
+   */
+  public void setSoftware3(Integer software3)
+  {
+    this.software3 = software3;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int key = 0;
+    if (customer != null) {
+      key |= (1 << 31);
+      key ^= customer;
+    }
+    if (product != null) {
+      key |= (1 << 30);
+      key ^= product;
+    }
+    if (os != null) {
+      key |= (1 << 29);
+      key ^= os;
+    }
+    if (software1 != null) {
+      key |= (1 << 28);
+      key ^= software1;
+    }
+    if (software2 != null) {
+      key |= (1 << 27);
+      key ^= software2;
+    }
+    if (software3 != null) {
+      key |= (1 << 26);
+      key ^= software3;
+    }
+    if (deviceId != null) {
+      key |= (1 << 25);
+      key ^= deviceId;
+    }
+    if (timeKey != null) {
+      key |= (1 << 24);
+      key ^= timeKey.hashCode();
+    }
+    if (day != null) {
+      key |= (1 << 23);
+      key ^= day.hashCode();
+    }
+
+    return key;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (!(obj instanceof MachineKey)) {
+      return false;
+    }
+    MachineKey mkey = (MachineKey)obj;
+    return checkStringEqual(this.timeKey, mkey.timeKey) && checkStringEqual(this.day, mkey.day) && checkIntEqual(this.customer, mkey.customer) && checkIntEqual(this.product, mkey.product) && checkIntEqual(this.os, mkey.os) && checkIntEqual(this.software1, mkey.software1) && checkIntEqual(this.software2, mkey.software2) && checkIntEqual(this.software3, mkey.software3) && checkIntEqual(this.deviceId, mkey.deviceId);
+  }
+
+  private boolean checkIntEqual(Integer a, Integer b)
+  {
+    if ((a == null) && (b == null)) {
+      return true;
+    }
+    if ((a != null) && a.equals(b)) {
+      return true;
+    }
+    return false;
+  }
+
+  private boolean checkStringEqual(String a, String b)
+  {
+    if ((a == null) && (b == null)) {
+      return true;
+    }
+    if ((a != null) && a.equals(b)) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public String toString()
+  {
+    StringBuilder sb = new StringBuilder(timeKey);
+    if (customer != null) {
+      sb.append("|0:").append(customer);
+    }
+    if (product != null) {
+      sb.append("|1:").append(product);
+    }
+    if (os != null) {
+      sb.append("|2:").append(os);
+    }
+    if (software1 != null) {
+      sb.append("|3:").append(software1);
+    }
+    if (software2 != null) {
+      sb.append("|4:").append(software2);
+    }
+    if (software3 != null) {
+      sb.append("|5:").append(software3);
+    }
+    if (deviceId != null) {
+      sb.append("|6:").append(deviceId);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * This method returns the deviceId of the device
+   * @return The deviceId
+   */
+  public Integer getDeviceId()
+  {
+    return deviceId;
+  }
+
+  /**
+   * This method sets the deviceId of the device
+   *
+   * @param deviceId
+   */
+  public void setDeviceId(Integer deviceId)
+  {
+    this.deviceId = deviceId;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/ResourceType.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/ResourceType.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/ResourceType.java
new file mode 100644
index 0000000..2fd457a
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/ResourceType.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.data;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+/**
+ * This class captures the resources whose usage is collected for each device
+ * <p>ResourceType class.</p>
+ *
+ * @since 0.3.5
+ */
+public enum ResourceType
+{
+
+  CPU("cpu"), RAM("ram"), HDD("hdd");
+
+  private static Map<String, ResourceType> descToResource = Maps.newHashMap();
+
+  static {
+    for (ResourceType type : ResourceType.values()) {
+      descToResource.put(type.desc, type);
+    }
+  }
+
+  private String desc;
+
+  private ResourceType(String desc)
+  {
+    this.desc = desc;
+  }
+
+  @Override
+  public String toString()
+  {
+    return desc;
+  }
+
+  /**
+   * This method returns ResourceType for the given description
+   * @param desc the description
+   * @return
+   */
+  public static ResourceType getResourceTypeOf(String desc)
+  {
+    return descToResource.get(desc);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/CalculatorOperator.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/CalculatorOperator.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/CalculatorOperator.java
new file mode 100644
index 0000000..34a9514
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/CalculatorOperator.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.operator;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.Max;
+import javax.validation.constraints.Min;
+
+import org.apache.apex.examples.machinedata.data.MachineInfo;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+import org.apache.apex.examples.machinedata.data.ResourceType;
+import org.apache.apex.examples.machinedata.util.DataTable;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * <p>
+ * CalculatorOperator class.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+public class CalculatorOperator extends BaseOperator
+{
+
+  private final DataTable<MachineKey, ResourceType, List<Integer>> data = new DataTable<>();
+
+  @Min(1)
+  @Max(99)
+  private int kthPercentile = 95; // kth percentile
+  private boolean computePercentile;
+  private boolean computeSD;
+  private boolean computeMax;
+
+  private int percentileThreshold = 80;
+  private int sdThreshold = 70;
+  private int maxThreshold = 99;
+
+  public final transient DefaultInputPort<MachineInfo> dataPort = new DefaultInputPort<MachineInfo>()
+  {
+    @Override
+    public void process(MachineInfo tuple)
+    {
+      addDataToCache(tuple);
+    }
+
+    /**
+     * Stream codec used for partitioning.
+     */
+    @Override
+    public StreamCodec<MachineInfo> getStreamCodec()
+    {
+      return new MachineInfoStreamCodec();
+    }
+  };
+
+  public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Double>>> percentileOutputPort = new DefaultOutputPort<>();
+
+  public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Double>>> sdOutputPort = new DefaultOutputPort<>();
+
+  public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Integer>>> maxOutputPort = new DefaultOutputPort<>();
+
+  public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>();
+
+  private void addDataToCache(MachineInfo tuple)
+  {
+    MachineKey machineKey = tuple.getMachineKey();
+    if (!data.containsRow(machineKey)) {
+      data.put(machineKey, ResourceType.CPU, Lists.<Integer>newArrayList());
+      data.put(machineKey, ResourceType.RAM, Lists.<Integer>newArrayList());
+      data.put(machineKey, ResourceType.HDD, Lists.<Integer>newArrayList());
+    }
+    data.get(machineKey, ResourceType.CPU).add(tuple.getCpu());
+    data.get(machineKey, ResourceType.RAM).add(tuple.getRam());
+    data.get(machineKey, ResourceType.HDD).add(tuple.getHdd());
+  }
+
+  @Override
+  public void endWindow()
+  {
+
+    if (computePercentile) {
+      for (MachineKey machineKey : data.rowKeySet()) {
+        Collections.sort(data.get(machineKey, ResourceType.CPU));
+        Collections.sort(data.get(machineKey, ResourceType.RAM));
+        Collections.sort(data.get(machineKey, ResourceType.HDD));
+
+        Map<ResourceType, Double> percentileData = Maps.newHashMap();
+        percentileData.put(ResourceType.CPU, getKthPercentile(data.get(machineKey, ResourceType.CPU)));
+        percentileData.put(ResourceType.RAM, getKthPercentile(data.get(machineKey, ResourceType.RAM)));
+        percentileData.put(ResourceType.HDD, getKthPercentile(data.get(machineKey, ResourceType.HDD)));
+        percentileOutputPort.emit(new KeyValPair<>(machineKey, percentileData));
+
+        for (ResourceType resourceType : percentileData.keySet()) {
+          double percentileValue = percentileData.get(resourceType);
+          if (percentileValue > percentileThreshold) {
+            emitAlert(resourceType, machineKey, percentileValue, "Percentile");
+          }
+        }
+      }
+    }
+    if (computeSD) {
+      for (MachineKey machineKey : data.rowKeySet()) {
+
+        Map<ResourceType, Double> sdData = Maps.newHashMap();
+
+        for (ResourceType resourceType : ResourceType.values()) {
+          sdData.put(resourceType, getSD(data.get(machineKey, resourceType)));
+        }
+        sdOutputPort.emit(new KeyValPair<>(machineKey, sdData));
+
+        for (ResourceType resourceType : sdData.keySet()) {
+          double sdValue = sdData.get(resourceType);
+          if (sdValue > sdThreshold) {
+            emitAlert(resourceType, machineKey, sdValue, "SD");
+          }
+        }
+      }
+    }
+    if (computeMax) {
+      for (MachineKey machineKey : data.rowKeySet()) {
+
+        Map<ResourceType, Integer> maxData = Maps.newHashMap();
+        maxData.put(ResourceType.CPU, Collections.max(data.get(machineKey, ResourceType.CPU)));
+        maxData.put(ResourceType.RAM, Collections.max(data.get(machineKey, ResourceType.RAM)));
+        maxData.put(ResourceType.HDD, Collections.max(data.get(machineKey, ResourceType.HDD)));
+
+        maxOutputPort.emit(new KeyValPair<>(machineKey, maxData));
+
+        for (ResourceType resourceType : maxData.keySet()) {
+          double sdValue = maxData.get(resourceType).doubleValue();
+          if (sdValue > maxThreshold) {
+            emitAlert(resourceType, machineKey, sdValue, "Max");
+          }
+        }
+      }
+    }
+    data.clear();
+  }
+
+  private void emitAlert(ResourceType type, MachineKey machineKey, double alertVal, String prefix)
+  {
+    BigDecimal decimalVal = new BigDecimal(alertVal);
+    decimalVal = decimalVal.setScale(2, BigDecimal.ROUND_HALF_UP);
+    String alertTime = machineKey.getDay() + machineKey.getTimeKey();
+    smtpAlert.emit(prefix + "-" + type.toString().toUpperCase() + " alert at " + alertTime + " " + type + " usage breached current usage: " + decimalVal.doubleValue() + "% threshold: " + percentileThreshold + "%\n\n" + machineKey);
+  }
+
+  private double getKthPercentile(List<Integer> sorted)
+  {
+
+    double val = (kthPercentile * sorted.size()) / 100.0;
+    if (val == (int)val) {
+      // Whole number
+      int idx = (int)val - 1;
+      return (sorted.get(idx) + sorted.get(idx + 1)) / 2.0;
+    } else {
+      int idx = (int)Math.round(val) - 1;
+      return sorted.get(idx);
+    }
+  }
+
+  private double getSD(List<Integer> data)
+  {
+    int sum = 0;
+    for (int i : data) {
+      sum += i;
+    }
+    double avg = sum / (data.size() * 1.0);
+    double sd = 0;
+    for (Integer point : data) {
+      sd += Math.pow(point - avg, 2);
+    }
+    return Math.sqrt(sd);
+  }
+
+  /**
+   * @param kVal the percentile which will be emitted by this operator
+   */
+  public void setKthPercentile(int kVal)
+  {
+    this.kthPercentile = kVal;
+  }
+
+  /**
+   * @param doCompute when true percentile will be computed
+   */
+  public void setComputePercentile(boolean doCompute)
+  {
+    this.computePercentile = doCompute;
+  }
+
+  /**
+   * @param doCompute when true standard deviation will be computed
+   */
+  public void setComputeSD(boolean doCompute)
+  {
+    this.computeSD = doCompute;
+  }
+
+  /**
+   * @param doCompute when true max will be computed
+   */
+  public void setComputeMax(boolean doCompute)
+  {
+    this.computeMax = doCompute;
+  }
+
+  /**
+   * @param threshold for percentile when breached will cause alert
+   */
+  public void setPercentileThreshold(int threshold)
+  {
+    this.percentileThreshold = threshold;
+  }
+
+  /**
+   * @param threshold for standard deviation when breached will cause alert
+   */
+  public void setSDThreshold(int threshold)
+  {
+    this.sdThreshold = threshold;
+  }
+
+  /**
+   * @param threshold for Max when breached will cause alert
+   */
+  public void setMaxThreshold(int threshold)
+  {
+    this.maxThreshold = threshold;
+  }
+
+  public static class MachineInfoStreamCodec extends KryoSerializableStreamCodec<MachineInfo> implements Serializable
+  {
+    public MachineInfoStreamCodec()
+    {
+      super();
+    }
+
+    @Override
+    public int getPartition(MachineInfo o)
+    {
+      return Objects.hashCode(o.getMachineKey().getCustomer(), o.getMachineKey().getOs(), o.getMachineKey().getProduct(), o.getMachineKey().getSoftware1(), o.getMachineKey().getSoftware2(), o.getMachineKey().getSoftware3());
+    }
+
+    private static final long serialVersionUID = 201411031403L;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingOperator.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingOperator.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingOperator.java
new file mode 100644
index 0000000..1de676f
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingOperator.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.operator;
+
+import java.math.BigDecimal;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.examples.machinedata.data.AverageData;
+import org.apache.apex.examples.machinedata.data.MachineInfo;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.lib.util.KeyHashValPair;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This class calculates the average for various resources across different devices for a given key
+ * <p>MachineInfoAveragingOperator class.</p>
+ *
+ * @since 0.9.0
+ */
+@SuppressWarnings("unused")
+public class MachineInfoAveragingOperator extends BaseOperator
+{
+
+  public static final String CPU = "cpu";
+  public static final String RAM = "ram";
+  public static final String HDD = "hdd";
+  public static final String DAY = "day";
+
+  private final transient Map<MachineKey, AverageData> dataMap = new HashMap<>();
+
+  public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<String, String>>> outputPort = new DefaultOutputPort<>();
+
+  public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>();
+
+  private int threshold = 95;
+
+  /**
+   * Buffer all the tuples as is till end window gets called
+   */
+  public final transient DefaultInputPort<KeyHashValPair<MachineKey, AverageData>> inputPort = new DefaultInputPort<KeyHashValPair<MachineKey, AverageData>>()
+  {
+
+    @Override
+    public void process(KeyHashValPair<MachineKey, AverageData> tuple)
+    {
+      addTuple(tuple);
+    }
+  };
+
+  /**
+   * This method returns the threshold value
+   *
+   * @return
+   */
+  public int getThreshold()
+  {
+    return threshold;
+  }
+
+  /**
+   * This method sets the threshold value. If the average usage for any Resource is above this for a given key, then the alert is sent
+   *
+   * @param threshold the threshold value
+   */
+  public void setThreshold(int threshold)
+  {
+    this.threshold = threshold;
+  }
+
+  /**
+   * This adds the given tuple to the dataMap
+   *
+   * @param tuple input tuple
+   */
+  private void addTuple(KeyHashValPair<MachineKey, AverageData> tuple)
+  {
+    MachineKey key = tuple.getKey();
+    dataMap.put(key, tuple.getValue());
+  }
+
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<MachineKey, AverageData> entry : dataMap.entrySet()) {
+      MachineKey key = entry.getKey();
+      AverageData averageResultMap = entry.getValue();
+      Map<String, String> averageResult = Maps.newHashMap();
+      long count = averageResultMap.getCount();
+      double average = averageResultMap.getCpu() / count;
+      averageResult.put(CPU, average + "");
+      emitAlert(average, CPU, key);
+      average = averageResultMap.getHdd() / count;
+      averageResult.put(HDD, average + "");
+      emitAlert(average, HDD, key);
+      average = averageResultMap.getRam() / count;
+      averageResult.put(RAM, average + "");
+      emitAlert(average, RAM, key);
+      averageResult.put(DAY, key.getDay());
+      outputPort.emit(new KeyValPair<>(key, averageResult));
+    }
+    dataMap.clear();
+  }
+
+  private void emitAlert(double average, String resourceType, MachineKey key)
+  {
+    if (average > threshold) {
+      BigDecimal bd = new BigDecimal(average);
+      bd = bd.setScale(2, BigDecimal.ROUND_HALF_UP);
+      String stime = key.getDay() + key.getTimeKey();
+      String skey = getKeyInfo(key);
+      smtpAlert.emit(resourceType.toUpperCase() + " alert at " + stime + " " + resourceType + " usage breached current usage: " + bd.doubleValue() + "% threshold: " + threshold + "%\n\n" + skey);
+    }
+  }
+
+  /**
+   * This method is used to artificially generate alerts
+   *
+   * @param genAlert
+   */
+  public void setGenAlert(boolean genAlert)
+  {
+    Calendar calendar = Calendar.getInstance();
+    long timestamp = System.currentTimeMillis();
+    calendar.setTimeInMillis(timestamp);
+    DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
+    Date date = calendar.getTime();
+    String timeKey = minuteDateFormat.format(date);
+    DateFormat dayDateFormat = new SimpleDateFormat("dd");
+    String day = dayDateFormat.format(date);
+
+    MachineKey alertKey = new MachineKey(timeKey, day);
+    alertKey.setCustomer(1);
+    alertKey.setProduct(5);
+    alertKey.setOs(10);
+    alertKey.setSoftware1(12);
+    alertKey.setSoftware2(14);
+    alertKey.setSoftware3(6);
+
+    MachineInfo machineInfo = new MachineInfo();
+    machineInfo.setMachineKey(alertKey);
+    machineInfo.setCpu(threshold + 1);
+    machineInfo.setRam(threshold + 1);
+    machineInfo.setHdd(threshold + 1);
+
+    smtpAlert.emit("CPU Alert: CPU Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
+    smtpAlert.emit("RAM Alert: RAM Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
+    smtpAlert.emit("HDD Alert: HDD Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
+  }
+
+  /**
+   * This method returns the String for a given MachineKey instance
+   *
+   * @param key MachineKey instance that needs to be converted to string
+   * @return
+   */
+  private String getKeyInfo(MachineKey key)
+  {
+    StringBuilder sb = new StringBuilder();
+    if (key instanceof MachineKey) {
+      MachineKey mkey = (MachineKey)key;
+      Integer customer = mkey.getCustomer();
+      if (customer != null) {
+        sb.append("customer: " + customer + "\n");
+      }
+      Integer product = mkey.getProduct();
+      if (product != null) {
+        sb.append("product version: " + product + "\n");
+      }
+      Integer os = mkey.getOs();
+      if (os != null) {
+        sb.append("os version: " + os + "\n");
+      }
+      Integer software1 = mkey.getSoftware1();
+      if (software1 != null) {
+        sb.append("software1 version: " + software1 + "\n");
+      }
+      Integer software2 = mkey.getSoftware2();
+      if (software2 != null) {
+        sb.append("software2 version: " + software2 + "\n");
+      }
+      Integer software3 = mkey.getSoftware3();
+      if (software3 != null) {
+        sb.append("software3 version: " + software3 + "\n");
+      }
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
new file mode 100644
index 0000000..14c8d25
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.examples.machinedata.data.AverageData;
+import org.apache.apex.examples.machinedata.data.MachineInfo;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * This class calculates the partial sum and count for tuples generated by upstream Operator
+ * <p> MachineInfoAveragingPrerequisitesOperator class. </p>
+ *
+ * @since 0.3.5
+ */
+public class MachineInfoAveragingPrerequisitesOperator extends BaseOperator
+{
+
+  // Aggregate sum of all values seen for a key.
+  private Map<MachineKey, AverageData> sums = new HashMap<>();
+
+  public final transient DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>> outputPort = new DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>>()
+  {
+    @Override
+    public Unifier<KeyHashValPair<MachineKey, AverageData>> getUnifier()
+    {
+      MachineInfoAveragingUnifier unifier = new MachineInfoAveragingUnifier();
+      return unifier;
+    }
+  };
+
+  public transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>()
+  {
+
+    @Override
+    public void process(MachineInfo tuple)
+    {
+      MachineKey key = tuple.getMachineKey();
+      AverageData averageData = sums.get(key);
+      if (averageData == null) {
+        averageData = new AverageData(tuple.getCpu(), tuple.getHdd(), tuple.getRam(), 1);
+        sums.put(key, averageData);
+      } else {
+        averageData.setCpu(averageData.getCpu() + tuple.getCpu());
+        averageData.setRam(averageData.getRam() + tuple.getRam());
+        averageData.setHdd(averageData.getHdd() + tuple.getHdd());
+        averageData.setCount(averageData.getCount() + 1);
+      }
+    }
+  };
+
+  @Override
+  public void endWindow()
+  {
+
+    for (Map.Entry<MachineKey, AverageData> entry : sums.entrySet()) {
+      if (outputPort.isConnected()) {
+        outputPort.emit(new KeyHashValPair<>(entry.getKey(), entry.getValue()));
+      }
+    }
+    sums.clear();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingUnifier.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingUnifier.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingUnifier.java
new file mode 100644
index 0000000..02aabe7
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingUnifier.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.examples.machinedata.data.AverageData;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator.Unifier;
+
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * This class calculates the partial sum and count for a given key
+ * <p>MachineInfoAveragingUnifier class.</p>
+ *
+ * @since 0.9.0
+ */
+public class MachineInfoAveragingUnifier implements Unifier<KeyHashValPair<MachineKey, AverageData>>
+{
+
+  private Map<MachineKey, AverageData> sums = new HashMap<>();
+  public final transient DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>> outputPort = new DefaultOutputPort<>();
+
+  @Override
+  public void beginWindow(long arg0)
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<MachineKey, AverageData> entry : sums.entrySet()) {
+      outputPort.emit(new KeyHashValPair<>(entry.getKey(), entry.getValue()));
+    }
+    sums.clear();
+
+  }
+
+  @Override
+  public void setup(OperatorContext arg0)
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void teardown()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void process(KeyHashValPair<MachineKey, AverageData> arg0)
+  {
+    MachineKey tupleKey = arg0.getKey();
+    AverageData averageData = sums.get(tupleKey);
+    AverageData tupleValue = arg0.getValue();
+    if (averageData == null) {
+      sums.put(tupleKey, tupleValue);
+    } else {
+      averageData.setCpu(averageData.getCpu() + tupleValue.getCpu());
+      averageData.setRam(averageData.getRam() + tupleValue.getRam());
+      averageData.setHdd(averageData.getHdd() + tupleValue.getHdd());
+      averageData.setCount(averageData.getCount() + tupleValue.getCount());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/Combinatorics.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/Combinatorics.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/Combinatorics.java
new file mode 100644
index 0000000..0ff8985
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/Combinatorics.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Generate combinations of elements for the given array of elements.
+ *
+ * Implements nCr = n! / (r! * (n-r)!)
+ *
+ * @since 0.3.5
+ */
+public class Combinatorics<T>
+{
+
+  private T[] values;
+  private int size = -1;
+  private List<T> result;
+  private Map<Integer, List<T>> resultMap = new HashMap<Integer, List<T>>();
+  private int resultMapSize = 0;
+
+  /**
+   * Generates all possible combinations with all the sizes.
+   *
+   * @param values
+   */
+  public Combinatorics(T[] values)
+  {
+    this.values = values;
+    this.size = -1;
+    this.result = new ArrayList<>();
+  }
+
+  /**
+   * Generates all possible combinations with the given size.
+   *
+   * @param values
+   * @param size
+   */
+  public Combinatorics(T[] values, int size)
+  {
+    this.values = values;
+    this.size = size;
+    this.result = new ArrayList<>();
+  }
+
+  public Map<Integer, List<T>> generate()
+  {
+
+    if (size == -1) {
+      size = values.length;
+      for (int i = 1; i <= size; i++) {
+        int[] tmp = new int[i];
+        Arrays.fill(tmp, -1);
+        generateCombinations(0, 0, tmp);
+      }
+    } else {
+      int[] tmp = new int[size];
+      Arrays.fill(tmp, -1);
+      generateCombinations(0, 0, tmp);
+    }
+    return resultMap;
+  }
+
+  public void generateCombinations(int start, int depth, int[] tmp)
+  {
+    if (depth == tmp.length) {
+      for (int j = 0; j < depth; j++) {
+        result.add(values[tmp[j]]);
+      }
+      resultMap.put(++resultMapSize, result);
+      result = new ArrayList<>();
+      return;
+    }
+    for (int i = start; i < values.length; i++) {
+      tmp[depth] = i;
+      generateCombinations(i + 1, depth + 1, tmp);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/DataTable.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/DataTable.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/DataTable.java
new file mode 100644
index 0000000..a147268
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/DataTable.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.util;
+
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Maps;
+
+/**
+ * <p>DataTable class.</p>
+ *
+ * @since 0.3.5
+ */
+public class DataTable<R,C,E>
+{
+
+  //machineKey, [cpu,ram,hdd] -> value
+  private final Map<R,Map<C,E>> table = Maps.newHashMap();
+
+  public boolean containsRow(R rowKey)
+  {
+    return table.containsKey(rowKey);
+  }
+
+  public void put(R rowKey,C colKey, E entry)
+  {
+    if (!containsRow(rowKey)) {
+      table.put(rowKey, Maps.<C,E>newHashMap());
+    }
+    table.get(rowKey).put(colKey, entry);
+  }
+
+  @Nullable
+  public E get(R rowKey, C colKey)
+  {
+    if (!containsRow(rowKey)) {
+      return null;
+    }
+    return table.get(rowKey).get(colKey);
+  }
+
+  public Set<R> rowKeySet()
+  {
+    return table.keySet();
+  }
+
+  public void clear()
+  {
+    table.clear();
+  }
+
+  public Map<R,Map<C,E>> getTable()
+  {
+    return table;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/resources/META-INF/properties.xml b/examples/machinedata/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..4ceb6b9
--- /dev/null
+++ b/examples/machinedata/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,139 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <property>
+    <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
+    <value>1000</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Receiver.attr.APPLICATION_WINDOW_COUNT
+    </name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Receiver.attr.PARTITIONER</name>
+    <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.DimensionsGenerator.attr.APPLICATION_WINDOW_COUNT
+    </name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.stream.Events.locality
+    </name>
+    <value>CONTAINER_LOCAL</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.DimensionsGenerator.inputport.inputPort.attr.PARTITION_PARALLEL
+    </name>
+    <value>true</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Aggregator.inputport.inputPort.attr.PARTITION_PARALLEL
+    </name>
+    <value>true</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Aggregator.attr.APPLICATION_WINDOW_COUNT
+    </name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.AverageCalculator.attr.APPLICATION_WINDOW_COUNT
+    </name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Persister.inputport.input.attr.PARTITION_PARALLEL
+    </name>
+    <value>true</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Persister.attr.APPLICATION_WINDOW_COUNT
+    </name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Persister.store.dbIndex
+    </name>
+    <value>2</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Persister.store.host
+    </name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Persister.store.port
+    </name>
+    <value>6379</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.port.*.attr.QUEUE_CAPACITY</name>
+    <value>32000</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Alerter.from
+    </name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Alerter.subject
+    </name>
+    <value>Alert!!!</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Alerter.recipients.TO
+    </name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Alerter.content
+    </name>
+    <value>{}</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Alerter.smtpHost
+    </name>
+    <value>localhost</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Alerter.smtpPort
+    </name>
+    <value>25</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Alerter.useSsl
+    </name>
+    <value>false</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.operator.Aggregator.outputport.outputPort.attr.UNIFIER_LIMIT
+    </name>
+    <value>8</value>
+  </property>
+  <property>
+    <name>dt.application.MachineDataExample.stream.DimensionalData.locality
+    </name>
+    <value>THREAD_LOCAL</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/test/java/org/apache/apex/examples/machinedata/CalculatorOperatorTest.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/test/java/org/apache/apex/examples/machinedata/CalculatorOperatorTest.java b/examples/machinedata/src/test/java/org/apache/apex/examples/machinedata/CalculatorOperatorTest.java
new file mode 100644
index 0000000..c69a2aa
--- /dev/null
+++ b/examples/machinedata/src/test/java/org/apache/apex/examples/machinedata/CalculatorOperatorTest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.examples.machinedata.data.MachineInfo;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+import org.apache.apex.examples.machinedata.data.ResourceType;
+import org.apache.apex.examples.machinedata.operator.CalculatorOperator;
+
+
+import com.google.common.collect.ImmutableList;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.TimeBucketKey;
+
+/**
+ * @since 0.3.5
+ */
+public class CalculatorOperatorTest
+{
+  private static DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
+  private static Logger LOG = LoggerFactory.getLogger(CalculatorOperatorTest.class);
+
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    CalculatorOperator calculatorOperator = new CalculatorOperator();
+    calculatorOperator.setup(null);
+
+    calculatorOperator.setComputePercentile(true);
+    calculatorOperator.setComputeMax(true);
+    calculatorOperator.setComputeSD(true);
+
+    testPercentile(calculatorOperator);
+  }
+
+  public void testPercentile(CalculatorOperator oper)
+  {
+
+    CollectorTestSink sortSink = new CollectorTestSink();
+    oper.percentileOutputPort.setSink(sortSink);
+    oper.setKthPercentile(50);
+    Calendar calendar = Calendar.getInstance();
+    Date date = calendar.getTime();
+    String timeKey = minuteDateFormat.format(date);
+    String day = calendar.get(Calendar.DAY_OF_MONTH) + "";
+
+    Integer vs = new Integer(1);
+    MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs);
+
+    oper.beginWindow(0);
+
+    MachineInfo info = new MachineInfo(mk, 1, 1, 1);
+    oper.dataPort.process(info);
+
+    info.setCpu(2);
+    oper.dataPort.process(info);
+
+    info.setCpu(3);
+    oper.dataPort.process(info);
+
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
+    for (Object o : sortSink.collectedTuples) {
+      LOG.debug(o.toString());
+      KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
+      Assert.assertEquals("emitted value for 'cpu' was ", 2.0, keyValPair.getValue().get(ResourceType.CPU), 0);
+      Assert.assertEquals("emitted value for 'hdd' was ", 1.0, keyValPair.getValue().get(ResourceType.HDD), 0);
+      Assert.assertEquals("emitted value for 'ram' was ", 1.0, keyValPair.getValue().get(ResourceType.RAM), 0);
+
+    }
+    LOG.debug("Done percentile testing\n");
+
+  }
+
+  public void testStandarDeviation(CalculatorOperator oper)
+  {
+    CollectorTestSink sortSink = new CollectorTestSink();
+    oper.sdOutputPort.setSink(sortSink);
+    Calendar calendar = Calendar.getInstance();
+    Date date = calendar.getTime();
+    String timeKey = minuteDateFormat.format(date);
+    String day = calendar.get(Calendar.DAY_OF_MONTH) + "";
+
+    Integer vs = new Integer(1);
+    MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs);
+
+    oper.beginWindow(0);
+
+    MachineInfo info = new MachineInfo(mk, 1, 1, 1);
+    oper.dataPort.process(info);
+
+    info.setCpu(2);
+    oper.dataPort.process(info);
+
+    info.setCpu(3);
+    oper.dataPort.process(info);
+
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
+    for (Object o : sortSink.collectedTuples) {
+      LOG.debug(o.toString());
+      KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
+      Assert.assertEquals("emitted value for 'cpu' was ", getSD(ImmutableList.of(1, 2, 3)), keyValPair.getValue().get(ResourceType.CPU), 0);
+      Assert.assertEquals("emitted value for 'hdd' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.HDD), 0);
+      Assert.assertEquals("emitted value for 'ram' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.RAM), 0);
+
+    }
+    LOG.debug("Done sd testing\n");
+
+  }
+
+  private final double getSD(List<Integer> input)
+  {
+    int sum = 0;
+    for (int i : input) {
+      sum += i;
+    }
+    double avg = sum / (input.size() * 1.0);
+    double sd = 0;
+    for (Integer point : input) {
+      sd += Math.pow(point - avg, 2);
+    }
+    return Math.sqrt(sd);
+  }
+
+  public void testMax(CalculatorOperator oper)
+  {
+    CollectorTestSink sortSink = new CollectorTestSink();
+    oper.maxOutputPort.setSink(sortSink);
+    Calendar calendar = Calendar.getInstance();
+    Date date = calendar.getTime();
+    String timeKey = minuteDateFormat.format(date);
+    String day = calendar.get(Calendar.DAY_OF_MONTH) + "";
+
+    Integer vs = new Integer(1);
+    MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs);
+
+    oper.beginWindow(0);
+
+    MachineInfo info = new MachineInfo(mk, 1, 1, 1);
+    oper.dataPort.process(info);
+
+    info.setCpu(2);
+    oper.dataPort.process(info);
+
+    info.setCpu(3);
+    oper.dataPort.process(info);
+
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
+    for (Object o : sortSink.collectedTuples) {
+      LOG.debug(o.toString());
+      KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
+      Assert.assertEquals("emitted value for 'cpu' was ", 3, keyValPair.getValue().get(ResourceType.CPU), 0);
+      Assert.assertEquals("emitted value for 'hdd' was ", 1, keyValPair.getValue().get(ResourceType.HDD), 0);
+      Assert.assertEquals("emitted value for 'ram' was ", 1, keyValPair.getValue().get(ResourceType.RAM), 0);
+
+    }
+    LOG.debug("Done max testing\n");
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/test/resources/log4j.properties b/examples/machinedata/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/machinedata/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/pom.xml
----------------------------------------------------------------------
diff --git a/examples/mobile/pom.xml b/examples/mobile/pom.xml
new file mode 100644
index 0000000..cb40887
--- /dev/null
+++ b/examples/mobile/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>malhar-examples-mobile</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar Mobile Example</name>
+  <description></description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <properties>
+    <skipTests>true</skipTests>
+  </properties>
+
+  <dependencies>
+    <!-- add your dependencies here -->
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+      <version>8.1.10.v20130312</version>
+      <scope>test</scope>
+      <type>jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-websocket</artifactId>
+      <version>8.1.10.v20130312</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.1</version>
+      <type>jar</type>
+    </dependency>
+  </dependencies>
+
+</project>