You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:28 UTC

[23/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
deleted file mode 100644
index 55b299f..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingOperator;
-import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingPrerequisitesOperator;
-import com.datatorrent.lib.io.SmtpOutputOperator;
-
-/**
- * <p>
- * Resource monitor application.
- * </p>
- *
- * @since 0.3.5
- */
-@ApplicationAnnotation(name = "MachineDataDemo")
-@SuppressWarnings("unused")
-public class Application implements StreamingApplication
-{
-
-  private static final Logger LOG = LoggerFactory.getLogger(Application.class);
-
-  /**
-   * This function sets up the DAG for calculating the average
-   *
-   * @param dag  the DAG instance
-   * @param conf the configuration instance
-   * @return MachineInfoAveragingPrerequisitesOperator
-   */
-  private MachineInfoAveragingPrerequisitesOperator addAverageCalculation(DAG dag, Configuration conf)
-  {
-    MachineInfoAveragingPrerequisitesOperator prereqAverageOper = dag.addOperator("Aggregator", MachineInfoAveragingPrerequisitesOperator.class);
-    MachineInfoAveragingOperator averageOperator = dag.addOperator("AverageCalculator", MachineInfoAveragingOperator.class);
-    RedisKeyValPairOutputOperator<MachineKey, Map<String, String>> redisAvgOperator = dag.addOperator("Persister", new RedisKeyValPairOutputOperator<MachineKey, Map<String, String>>());
-    dag.addStream("Average", averageOperator.outputPort, redisAvgOperator.input);
-    SmtpOutputOperator smtpOutputOperator = dag.addOperator("Alerter", new SmtpOutputOperator());
-    dag.addStream("Aggregates", prereqAverageOper.outputPort, averageOperator.inputPort);
-    dag.addStream("Alerts", averageOperator.smtpAlert, smtpOutputOperator.input);
-    return prereqAverageOper;
-  }
-
-  /**
-   * Create the DAG
-   */
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    InputReceiver randomGen = dag.addOperator("Receiver", InputReceiver.class);
-    DimensionGenerator dimensionGenerator = dag.addOperator("DimensionsGenerator", DimensionGenerator.class);
-    dag.addStream("Events", randomGen.outputInline, dimensionGenerator.inputPort);
-    MachineInfoAveragingPrerequisitesOperator prereqAverageOper = addAverageCalculation(dag, conf);
-    dag.addStream("DimensionalData", dimensionGenerator.outputInline, prereqAverageOper.inputPort);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
deleted file mode 100644
index 75c2a02..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-
-/**
- * <p>
- * Information tuple generator with randomness.
- * </p>
- *
- * @since 0.3.5
- */
-@SuppressWarnings("unused")
-public class DimensionGenerator extends BaseOperator
-{
-  public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>();
-  public transient DefaultOutputPort<MachineInfo> output = new DefaultOutputPort<>();
-  private int threshold = 90;
-
-  public final transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>()
-  {
-
-    @Override
-    public void process(MachineInfo tuple)
-    {
-      emitDimensions(tuple);
-    }
-
-  };
-
-  @Override
-  public void setup(Context.OperatorContext context)
-  {
-    super.setup(context);
-  }
-
-  /**
-   * This returns the threshold value set
-   * @return
-   */
-  public int getThreshold()
-  {
-    return threshold;
-  }
-
-  /**
-   * This function sets the threshold value. This value is used to check the maximum value for cpu/ram/hdd
-   * @param threshold
-   */
-  public void setThreshold(int threshold)
-  {
-    this.threshold = threshold;
-  }
-
-  /**
-   * This function takes in the tuple from upstream operator and generates tuples with different dimension combinations
-   *
-   * @param tuple
-   */
-  private void emitDimensions(MachineInfo tuple)
-  {
-    MachineKey tupleKey = tuple.getMachineKey();
-
-    for (int i = 0; i < 64; i++) {
-      MachineKey machineKey = new MachineKey(tupleKey.getTimeKey(),tupleKey.getDay());
-      if ((i & 1) != 0) {
-        machineKey.setCustomer(tupleKey.getCustomer());
-      }
-      if ((i & 2) != 0) {
-        machineKey.setProduct(tupleKey.getProduct());
-      }
-      if ((i & 4) != 0) {
-        machineKey.setOs(tupleKey.getOs());
-      }
-      if ((i & 8) != 0) {
-        machineKey.setDeviceId(tupleKey.getDeviceId());
-      }
-      if ((i & 16) != 0) {
-        machineKey.setSoftware1(tupleKey.getSoftware1());
-      }
-      if ((i & 32) != 0) {
-        machineKey.setSoftware2(tupleKey.getSoftware2());
-      }
-
-      int cpu = tuple.getCpu();
-      int ram = tuple.getRam();
-      int hdd = tuple.getHdd();
-      MachineInfo machineInfo = new MachineInfo();
-      machineInfo.setMachineKey(machineKey);
-      machineInfo.setCpu((cpu < threshold) ? cpu : threshold);
-      machineInfo.setRam((ram < threshold) ? ram : threshold);
-      machineInfo.setHdd((hdd < threshold) ? hdd : threshold);
-      outputInline.emit(machineInfo);
-      output.emit(machineInfo);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
deleted file mode 100644
index 85ec954..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Random;
-import java.util.TimeZone;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-
-/**
- * <p>
- * Information tuple generator with randomness.
- * </p>
- *
- * @since 0.3.5
- */
-@SuppressWarnings("unused")
-public class InputReceiver extends BaseOperator implements InputOperator
-{
-  private static final Logger logger = LoggerFactory.getLogger(InputReceiver.class);
-
-  public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>();
-  private final Random randomGen = new Random();
-
-  private int customerMin = 1;
-  private int customerMax = 5;
-  private int productMin = 4;
-  private int productMax = 6;
-  private int osMin = 10;
-  private int osMax = 12;
-  private int software1Min = 10;
-  private int software1Max = 12;
-  private int software2Min = 12;
-  private int software2Max = 14;
-  private int software3Min = 4;
-  private int software3Max = 6;
-
-  private int deviceIdMin = 1;
-  private int deviceIdMax = 50;
-
-  private int tupleBlastSize = 1001;
-
-  private static final DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
-  private static final DateFormat dayDateFormat = new SimpleDateFormat("d");
-
-  static {
-    TimeZone tz = TimeZone.getTimeZone("GMT");
-    minuteDateFormat.setTimeZone(tz);
-    dayDateFormat.setTimeZone(tz);
-
-  }
-
-  @Override
-  public void setup(Context.OperatorContext context)
-  {
-    super.setup(context);
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    super.beginWindow(windowId);
-  }
-
-  @Override
-  public void emitTuples()
-  {
-    int count = 0;
-    Calendar calendar = Calendar.getInstance();
-    Date date = calendar.getTime();
-    String timeKey = minuteDateFormat.format(date);
-    String day = dayDateFormat.format(date);
-
-    while (count < tupleBlastSize) {
-      randomGen.setSeed(System.currentTimeMillis());
-
-      int customerVal = genCustomerId();
-      int productVal = genProductVer();
-      int osVal = genOsVer();
-      int software1Val = genSoftware1Ver();
-      int software2Val = genSoftware2Ver();
-      int software3Val = genSoftware3Ver();
-      int deviceIdVal = genDeviceId();
-
-      int cpuVal = genCpu(calendar);
-      int ramVal = genRam(calendar);
-      int hddVal = genHdd(calendar);
-
-      MachineKey machineKey = new MachineKey(timeKey, day);
-
-      machineKey.setCustomer(customerVal);
-      machineKey.setProduct(productVal);
-      machineKey.setOs(osVal);
-      machineKey.setDeviceId(deviceIdVal);
-      machineKey.setSoftware1(software1Val);
-      machineKey.setSoftware2(software2Val);
-      machineKey.setSoftware3(software3Val);
-      MachineInfo machineInfo = new MachineInfo();
-      machineInfo.setMachineKey(machineKey);
-      machineInfo.setCpu(cpuVal);
-      machineInfo.setRam(ramVal);
-      machineInfo.setHdd(hddVal);
-
-      outputInline.emit(machineInfo);
-
-      count++;
-    }
-  }
-
-  private int genCustomerId()
-  {
-    int range = customerMax - customerMin + 1;
-    return customerMin + randomGen.nextInt(range);
-  }
-
-  private int genProductVer()
-  {
-    int range = productMax - productMin + 1;
-    return productMin + randomGen.nextInt(range);
-  }
-
-  private int genOsVer()
-  {
-    int range = osMax - osMin + 1;
-    return osMin + randomGen.nextInt(range);
-  }
-
-  private int genSoftware3Ver()
-  {
-    int range = software3Max - software3Min + 1;
-    return software3Min + randomGen.nextInt(range);
-  }
-
-  private int genDeviceId()
-  {
-    int range = deviceIdMax - deviceIdMin + 1;
-    return deviceIdMin + randomGen.nextInt(range);
-  }
-
-  private int genSoftware1Ver()
-  {
-    int range = software1Max - software1Min + 1;
-    return software1Min + randomGen.nextInt(range);
-  }
-
-  private int genSoftware2Ver()
-  {
-    int range = software2Max - software2Min + 1;
-    return software2Min + randomGen.nextInt(range);
-  }
-
-  private int genCpu(Calendar cal)
-  {
-    int minute = cal.get(Calendar.MINUTE);
-    int second;
-    int range = minute / 2 + 19;
-    if (minute / 17 == 0) {
-      second = cal.get(Calendar.SECOND);
-      return (30 + randomGen.nextInt(range) + (minute % 7) - (second % 11));
-    } else if (minute / 47 == 0) {
-      second = cal.get(Calendar.SECOND);
-      return (7 + randomGen.nextInt(range) + (minute % 7) - (second % 7));
-    } else {
-      second = cal.get(Calendar.SECOND);
-      return (randomGen.nextInt(range) + (minute % 19) + (second % 7));
-    }
-  }
-
-  private int genRam(Calendar cal)
-  {
-    int minute = cal.get(Calendar.MINUTE);
-    int second;
-    int range = minute + 1;
-    if (minute / 23 == 0) {
-      second = cal.get(Calendar.SECOND);
-      return (20 + randomGen.nextInt(range) + (minute % 5) - (second % 11));
-    } else if (minute / 37 == 0) {
-      second = cal.get(Calendar.SECOND);
-      return (11 + randomGen.nextInt(60) - (minute % 5) - (second % 11));
-    } else {
-      second = cal.get(Calendar.SECOND);
-      return (randomGen.nextInt(range) + (minute % 17) + (second % 11));
-    }
-  }
-
-  private int genHdd(Calendar cal)
-  {
-    int minute = cal.get(Calendar.MINUTE);
-    int second;
-    int range = minute / 2 + 1;
-    if (minute / 37 == 0) {
-      second = cal.get(Calendar.SECOND);
-      return (25 + randomGen.nextInt(range) - minute % 7 - second % 11);
-    } else {
-      second = cal.get(Calendar.SECOND);
-      return (randomGen.nextInt(range) + minute % 23 + second % 11);
-    }
-  }
-
-  /**
-   * This method returns the minimum value for customer
-   *
-   * @return
-   */
-  public int getCustomerMin()
-  {
-    return customerMin;
-  }
-
-  /**
-   * This method is used to set the minimum value for customer
-   *
-   * @param customerMin the minimum customer value
-   */
-  public void setCustomerMin(int customerMin)
-  {
-    this.customerMin = customerMin;
-  }
-
-  /**
-   * This method returns the max value for customer
-   *
-   * @return
-   */
-  public int getCustomerMax()
-  {
-    return customerMax;
-  }
-
-  /**
-   * This method is used to set the max value for customer
-   *
-   * @param customerMax the max customer value
-   */
-  public void setCustomerMax(int customerMax)
-  {
-    this.customerMax = customerMax;
-  }
-
-  /**
-   * This method returns the minimum value for product
-   *
-   * @return
-   */
-  public int getProductMin()
-  {
-    return productMin;
-  }
-
-  /**
-   * This method is used to set the minimum value for product
-   *
-   * @param productMin the minimum product value
-   */
-  public void setProductMin(int productMin)
-  {
-    this.productMin = productMin;
-  }
-
-  /**
-   * This method returns the max value for product
-   *
-   * @return
-   */
-  public int getProductMax()
-  {
-    return productMax;
-  }
-
-  /**
-   * This method is used to set the max value for product
-   *
-   * @param productMax the max product value
-   */
-  public void setProductMax(int productMax)
-  {
-    this.productMax = productMax;
-  }
-
-  /**
-   * This method returns the minimum value for OS
-   *
-   * @return
-   */
-  public int getOsMin()
-  {
-    return osMin;
-  }
-
-  /**
-   * This method is used to set the minimum value for OS
-   *
-   * @param osMin the min OS value
-   */
-  public void setOsMin(int osMin)
-  {
-    this.osMin = osMin;
-  }
-
-  /**
-   * This method returns the max value for OS
-   *
-   * @return
-   */
-  public int getOsMax()
-  {
-    return osMax;
-  }
-
-  /**
-   * This method is used to set the max value for OS
-   *
-   * @param osMax the max OS value
-   */
-  public void setOsMax(int osMax)
-  {
-    this.osMax = osMax;
-  }
-
-  /**
-   * This method returns the minimum value for software1
-   *
-   * @return
-   */
-  public int getSoftware1Min()
-  {
-    return software1Min;
-  }
-
-  /**
-   * This method is used to set the minimum value for software1
-   *
-   * @param software1Min the minimum software1 value
-   */
-  public void setSoftware1Min(int software1Min)
-  {
-    this.software1Min = software1Min;
-  }
-
-  /**
-   * This method returns the max value for software1
-   *
-   * @return
-   */
-  public int getSoftware1Max()
-  {
-    return software1Max;
-  }
-
-  /**
-   * This method is used to set the max value for software1
-   *
-   * @param software1Max the max software1 value
-   */
-  public void setSoftware1Max(int software1Max)
-  {
-    this.software1Max = software1Max;
-  }
-
-  /**
-   * This method returns the minimum value for software2
-   *
-   * @return
-   */
-  public int getSoftware2Min()
-  {
-    return software2Min;
-  }
-
-  /**
-   * This method is used to set the minimum value for software2
-   *
-   * @param software2Min the minimum software2 value
-   */
-  public void setSoftware2Min(int software2Min)
-  {
-    this.software2Min = software2Min;
-  }
-
-  /**
-   * This method returns the max value for software2
-   *
-   * @return
-   */
-  public int getSoftware2Max()
-  {
-    return software2Max;
-  }
-
-  /**
-   * This method is used to set the max value for software2
-   *
-   * @param software2Max the max software2 value
-   */
-  public void setSoftware2Max(int software2Max)
-  {
-    this.software2Max = software2Max;
-  }
-
-  /**
-   * This method returns the minimum value for software3
-   *
-   * @return
-   */
-  public int getSoftware3Min()
-  {
-    return software3Min;
-  }
-
-  /**
-   * This method is used to set the minimum value for software3
-   *
-   * @param software3Min the minimum software3 value
-   */
-  public void setSoftware3Min(int software3Min)
-  {
-    this.software3Min = software3Min;
-  }
-
-  /**
-   * This method returns the max value for software3
-   *
-   * @return
-   */
-  public int getSoftware3Max()
-  {
-    return software3Max;
-  }
-
-  /**
-   * This method is used to set the max value for software3
-   *
-   * @param software3Max the max software3 value
-   */
-  public void setSoftware3Max(int software3Max)
-  {
-    this.software3Max = software3Max;
-  }
-
-  /**
-   * This method returns the minimum value for deviceId
-   *
-   * @return
-   */
-  public int getDeviceIdMin()
-  {
-    return deviceIdMin;
-  }
-
-  /**
-   * This method is used to set the minimum value for deviceId
-   *
-   * @param deviceIdMin the minimum deviceId value
-   */
-  public void setDeviceIdMin(int deviceIdMin)
-  {
-    this.deviceIdMin = deviceIdMin;
-  }
-
-  /**
-   * This method returns the max value for deviceId
-   *
-   * @return
-   */
-  public int getDeviceIdMax()
-  {
-    return deviceIdMax;
-  }
-
-  /**
-   * This method is used to set the max value for deviceId
-   *
-   * @param deviceIdMax the max deviceId value
-   */
-  public void setDeviceIdMax(int deviceIdMax)
-  {
-    this.deviceIdMax = deviceIdMax;
-  }
-
-  /**
-   * @return the tupleBlastSize
-   */
-  public int getTupleBlastSize()
-  {
-    return tupleBlastSize;
-  }
-
-  /**
-   * @param tupleBlastSize the tupleBlastSize to set
-   */
-  public void setTupleBlastSize(int tupleBlastSize)
-  {
-    this.tupleBlastSize = tupleBlastSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java
deleted file mode 100644
index 3c74cc5..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.data;
-
-
-/**
- * This class stores the value of sum and the count of values summed.
- * <p>
- * AverageData class.
- * </p>
- *
- * @since 0.3.5
- */
-public class AverageData
-{
-
-  private long cpu;
-  private long hdd;
-  private long ram;
-  private long count;
-
-  /**
-   * This is default constructor that sets the sum and count to 0
-   */
-  public AverageData()
-  {
-
-  }
-
-  /**
-   * This constructor takes the value of sum and count and initialize the local attributes to corresponding values
-   *
-   * @param count
-   *          the value of count
-   */
-  public AverageData(long cpu,long hdd,long ram, long count)
-  {
-    this.cpu = cpu;
-    this.ram = ram;
-    this.hdd = hdd;
-    this.count = count;
-  }
-
-  public long getCpu()
-  {
-    return cpu;
-  }
-
-  public void setCpu(long cpu)
-  {
-    this.cpu = cpu;
-  }
-
-  public long getHdd()
-  {
-    return hdd;
-  }
-
-  public void setHdd(long hdd)
-  {
-    this.hdd = hdd;
-  }
-
-  public long getRam()
-  {
-    return ram;
-  }
-
-  public void setRam(long ram)
-  {
-    this.ram = ram;
-  }
-
-  /**
-   * This returns the value of count
-   * @return
-   */
-  public long getCount()
-  {
-    return count;
-  }
-
-  /**
-   * This method sets the value of count
-   * @param count
-   */
-  public void setCount(long count)
-  {
-    this.count = count;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java
deleted file mode 100644
index 6f02a24..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.data;
-
-/**
- * This class stores the cpu% usage, ram% usage, hdd% usage and key information about a particular machine
- * <p>
- * MachineInfo class.
- * </p>
- *
- * @since 0.3.5
- */
-public class MachineInfo
-{
-  private MachineKey machineKey;
-  private int cpu;
-  private int ram;
-  private int hdd;
-
-  /**
-   * This default constructor
-   */
-  public MachineInfo()
-  {
-  }
-
-  /**
-   * This constructor takes MachineKey as input and initialize local attributes
-   *
-   * @param machineKey
-   *          the MachineKey instance
-   */
-  public MachineInfo(MachineKey machineKey)
-  {
-    this.machineKey = machineKey;
-  }
-
-  /**
-   * This constructor takes MachineKey, cpu usage, ram usage, hdd usage as input and initialize local attributes
-   *
-   * @param machineKey
-   *          the MachineKey instance
-   * @param cpu
-   *          the CPU% usage
-   * @param ram
-   *          the RAM% usage
-   * @param hdd
-   *          the HDD% usage
-   */
-  public MachineInfo(MachineKey machineKey, int cpu, int ram, int hdd)
-  {
-    this.machineKey = machineKey;
-    this.cpu = cpu;
-    this.ram = ram;
-    this.hdd = hdd;
-  }
-
-  /**
-   * This method returns the MachineKey
-   *
-   * @return
-   */
-  public MachineKey getMachineKey()
-  {
-    return machineKey;
-  }
-
-  /**
-   * This method sets the MachineKey
-   *
-   * @param machineKey
-   *          the MachineKey instance
-   */
-  public void setMachineKey(MachineKey machineKey)
-  {
-    this.machineKey = machineKey;
-  }
-
-  /**
-   * This method returns the CPU% usage
-   *
-   * @return
-   */
-  public int getCpu()
-  {
-    return cpu;
-  }
-
-  /**
-   * This method sets the CPU% usage
-   *
-   * @param cpu
-   *          the CPU% usage
-   */
-  public void setCpu(int cpu)
-  {
-    this.cpu = cpu;
-  }
-
-  /**
-   * This method returns the RAM% usage
-   *
-   * @return
-   */
-  public int getRam()
-  {
-    return ram;
-  }
-
-  /**
-   * This method sets the RAM% usage
-   *
-   * @param ram
-   *          the RAM% usage
-   */
-  public void setRam(int ram)
-  {
-    this.ram = ram;
-  }
-
-  /**
-   * This method returns the HDD% usage
-   *
-   * @return
-   */
-  public int getHdd()
-  {
-    return hdd;
-  }
-
-  /**
-   * This method sets the HDD% usage
-   *
-   * @param hdd
-   *          the HDD% usage
-   */
-  public void setHdd(int hdd)
-  {
-    this.hdd = hdd;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
deleted file mode 100644
index 2b3bb1c..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.data;
-
-/**
- * This class stores the information about various softwares, deviceIds, OS of the device
- * <p>
- * MachineKey class.
- * </p>
- *
- * @since 0.3.5
- */
-public class MachineKey
-{
-
-  private Integer customer;
-  private Integer product;
-  private Integer os;
-  private Integer software1;
-  private Integer software2;
-  private Integer software3;
-  private Integer deviceId;
-  private String timeKey;
-  private String day;
-
-  /**
-   * This constructor takes the format in which time has to be captured and the day when this instance is created
-   *
-   * @param timeKey the format in which time has to be captured
-   * @param day the day when this instance is created
-   */
-  public MachineKey(String timeKey, String day)
-  {
-    this.timeKey = timeKey;
-    this.day = day;
-  }
-
-  /**
-   * This is default constructor
-   */
-  public MachineKey()
-  {
-  }
-
-  /**
-   * This constructor takes format in which time has to be captured, the day when this instance is created, the customer
-   * id, product Id on the device, OS version on the device, software1 version on the device, software2 version on the device,
-   * software3 version on the device, deviceId on the device,
-   *
-   * @param timeKey the format in which time has to be captured
-   * @param day the day when this instance is created
-   * @param customer the customer Id
-   * @param product product Id
-   * @param os OS version
-   * @param software1 software1 version
-   * @param software2 software2 version
-   * @param software3 software3 version
-   * @param deviceId deviceId
-   */
-  public MachineKey(String timeKey, String day, Integer customer, Integer product, Integer os, Integer software1, Integer software2, Integer software3, Integer deviceId)
-  {
-    this.timeKey = timeKey;
-    this.day = day;
-    this.customer = customer;
-    this.product = product;
-    this.os = os;
-    this.software1 = software1;
-    this.software2 = software2;
-    this.software3 = software3;
-    this.deviceId = deviceId;
-  }
-
-  /**
-   * This method returns the format in which the time is captured. The time is the time when this instance of MachineKey
-   * was generated. For e.g. HHmm to capture Hour and minute
-   *
-   * @return
-   */
-  public String getTimeKey()
-  {
-    return timeKey;
-  }
-
-  /**
-   * This method sets the format in which the time is captured. The time is the time when this instance of MachineKey
-   * was generated. For e.g. HHmm to capture Hour and minute
-   *
-   * @param timeKey
-   *          the value of format
-   */
-  public void setTimeKey(String timeKey)
-  {
-    this.timeKey = timeKey;
-  }
-
-  /**
-   * This method returns the day of the month when this instance of MachineKey was generated
-   *
-   * @return
-   */
-  public String getDay()
-  {
-    return day;
-  }
-
-  /**
-   * This method sets the day of the month when this instance of MachineKey was generated
-   *
-   * @param day
-   *          the day of the month
-   */
-  public void setDay(String day)
-  {
-    this.day = day;
-  }
-
-  /**
-   * This method returns the customer Id
-   *
-   * @return
-   */
-  public Integer getCustomer()
-  {
-    return customer;
-  }
-
-  /**
-   * This method sets the customer Id
-   *
-   * @param customer
-   *          the customer Id
-   */
-  public void setCustomer(Integer customer)
-  {
-    this.customer = customer;
-  }
-
-  /**
-   * This method returns product on the device
-   *
-   * @return
-   */
-  public Integer getProduct()
-  {
-    return product;
-  }
-
-  /**
-   * This method sets the product on the device
-   *
-   * @param product
-   *          the value of product
-   */
-  public void setProduct(Integer product)
-  {
-    this.product = product;
-  }
-
-  /**
-   * This method returns the OS version on the device
-   *
-   * @return
-   */
-  public Integer getOs()
-  {
-    return os;
-  }
-
-  /**
-   * This method sets the OS version on the device
-   *
-   * @param os
-   *          OS version
-   */
-  public void setOs(Integer os)
-  {
-    this.os = os;
-  }
-
-  /**
-   * This method returns the version of the software1 on the device
-   *
-   * @return
-   */
-  public Integer getSoftware1()
-  {
-    return software1;
-  }
-
-  /**
-   * This method sets the version of the software1 on the device
-   *
-   * @param software1 the version of the software1
-   */
-  public void setSoftware1(Integer software1)
-  {
-    this.software1 = software1;
-  }
-
-  /**
-   * This method returns the version of the software2 on the device
-   *
-   * @return
-   */
-  public Integer getSoftware2()
-  {
-    return software2;
-  }
-
-  /**
-   * This method sets the version of the software2 on the device
-   *
-   * @param software2
-   *          the version of the software2
-   */
-  public void setSoftware2(Integer software2)
-  {
-    this.software2 = software2;
-  }
-
-  /**
-   * This method returns the version of the software3 on the device
-   *
-   * @return
-   */
-  public Integer getSoftware3()
-  {
-    return software3;
-  }
-
-  /**
-   * This method sets the version of the software3 on the device
-   *
-   * @param software3
-   *          the version of the software3
-   */
-  public void setSoftware3(Integer software3)
-  {
-    this.software3 = software3;
-  }
-
-  @Override
-  public int hashCode()
-  {
-    int key = 0;
-    if (customer != null) {
-      key |= (1 << 31);
-      key ^= customer;
-    }
-    if (product != null) {
-      key |= (1 << 30);
-      key ^= product;
-    }
-    if (os != null) {
-      key |= (1 << 29);
-      key ^= os;
-    }
-    if (software1 != null) {
-      key |= (1 << 28);
-      key ^= software1;
-    }
-    if (software2 != null) {
-      key |= (1 << 27);
-      key ^= software2;
-    }
-    if (software3 != null) {
-      key |= (1 << 26);
-      key ^= software3;
-    }
-    if (deviceId != null) {
-      key |= (1 << 25);
-      key ^= deviceId;
-    }
-    if (timeKey != null) {
-      key |= (1 << 24);
-      key ^= timeKey.hashCode();
-    }
-    if (day != null) {
-      key |= (1 << 23);
-      key ^= day.hashCode();
-    }
-
-    return key;
-  }
-
-  @Override
-  public boolean equals(Object obj)
-  {
-    if (!(obj instanceof MachineKey)) {
-      return false;
-    }
-    MachineKey mkey = (MachineKey)obj;
-    return checkStringEqual(this.timeKey, mkey.timeKey) && checkStringEqual(this.day, mkey.day) && checkIntEqual(this.customer, mkey.customer) && checkIntEqual(this.product, mkey.product) && checkIntEqual(this.os, mkey.os) && checkIntEqual(this.software1, mkey.software1) && checkIntEqual(this.software2, mkey.software2) && checkIntEqual(this.software3, mkey.software3) && checkIntEqual(this.deviceId, mkey.deviceId);
-  }
-
-  private boolean checkIntEqual(Integer a, Integer b)
-  {
-    if ((a == null) && (b == null)) {
-      return true;
-    }
-    if ((a != null) && a.equals(b)) {
-      return true;
-    }
-    return false;
-  }
-
-  private boolean checkStringEqual(String a, String b)
-  {
-    if ((a == null) && (b == null)) {
-      return true;
-    }
-    if ((a != null) && a.equals(b)) {
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public String toString()
-  {
-    StringBuilder sb = new StringBuilder(timeKey);
-    if (customer != null) {
-      sb.append("|0:").append(customer);
-    }
-    if (product != null) {
-      sb.append("|1:").append(product);
-    }
-    if (os != null) {
-      sb.append("|2:").append(os);
-    }
-    if (software1 != null) {
-      sb.append("|3:").append(software1);
-    }
-    if (software2 != null) {
-      sb.append("|4:").append(software2);
-    }
-    if (software3 != null) {
-      sb.append("|5:").append(software3);
-    }
-    if (deviceId != null) {
-      sb.append("|6:").append(deviceId);
-    }
-    return sb.toString();
-  }
-
-  /**
-   * This method returns the deviceId of the device
-   * @return The deviceId
-   */
-  public Integer getDeviceId()
-  {
-    return deviceId;
-  }
-
-  /**
-   * This method sets the deviceId of the device
-   *
-   * @param deviceId
-   */
-  public void setDeviceId(Integer deviceId)
-  {
-    this.deviceId = deviceId;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
deleted file mode 100644
index d474c5c..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.data;
-
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-
-/**
- * This class captures the resources whose usage is collected for each device
- * <p>ResourceType class.</p>
- *
- * @since 0.3.5
- */
-public enum ResourceType
-{
-
-  CPU("cpu"), RAM("ram"), HDD("hdd");
-
-  private static Map<String, ResourceType> descToResource = Maps.newHashMap();
-
-  static {
-    for (ResourceType type : ResourceType.values()) {
-      descToResource.put(type.desc, type);
-    }
-  }
-
-  private String desc;
-
-  private ResourceType(String desc)
-  {
-    this.desc = desc;
-  }
-
-  @Override
-  public String toString()
-  {
-    return desc;
-  }
-
-  /**
-   * This method returns ResourceType for the given description
-   * @param desc the description
-   * @return
-   */
-  public static ResourceType getResourceTypeOf(String desc)
-  {
-    return descToResource.get(desc);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
deleted file mode 100644
index 8f68dab..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.operator;
-
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import javax.validation.constraints.Max;
-import javax.validation.constraints.Min;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.demos.machinedata.data.ResourceType;
-import com.datatorrent.demos.machinedata.util.DataTable;
-import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * <p>
- * CalculatorOperator class.
- * </p>
- *
- * @since 0.3.5
- */
-public class CalculatorOperator extends BaseOperator
-{
-
-  private final DataTable<MachineKey, ResourceType, List<Integer>> data = new DataTable<>();
-
-  @Min(1)
-  @Max(99)
-  private int kthPercentile = 95; // kth percentile
-  private boolean computePercentile;
-  private boolean computeSD;
-  private boolean computeMax;
-
-  private int percentileThreshold = 80;
-  private int sdThreshold = 70;
-  private int maxThreshold = 99;
-
-  public final transient DefaultInputPort<MachineInfo> dataPort = new DefaultInputPort<MachineInfo>()
-  {
-    @Override
-    public void process(MachineInfo tuple)
-    {
-      addDataToCache(tuple);
-    }
-
-    /**
-     * Stream codec used for partitioning.
-     */
-    @Override
-    public StreamCodec<MachineInfo> getStreamCodec()
-    {
-      return new MachineInfoStreamCodec();
-    }
-  };
-
-  public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Double>>> percentileOutputPort = new DefaultOutputPort<>();
-
-  public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Double>>> sdOutputPort = new DefaultOutputPort<>();
-
-  public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Integer>>> maxOutputPort = new DefaultOutputPort<>();
-
-  public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>();
-
-  private void addDataToCache(MachineInfo tuple)
-  {
-    MachineKey machineKey = tuple.getMachineKey();
-    if (!data.containsRow(machineKey)) {
-      data.put(machineKey, ResourceType.CPU, Lists.<Integer>newArrayList());
-      data.put(machineKey, ResourceType.RAM, Lists.<Integer>newArrayList());
-      data.put(machineKey, ResourceType.HDD, Lists.<Integer>newArrayList());
-    }
-    data.get(machineKey, ResourceType.CPU).add(tuple.getCpu());
-    data.get(machineKey, ResourceType.RAM).add(tuple.getRam());
-    data.get(machineKey, ResourceType.HDD).add(tuple.getHdd());
-  }
-
-  @Override
-  public void endWindow()
-  {
-
-    if (computePercentile) {
-      for (MachineKey machineKey : data.rowKeySet()) {
-        Collections.sort(data.get(machineKey, ResourceType.CPU));
-        Collections.sort(data.get(machineKey, ResourceType.RAM));
-        Collections.sort(data.get(machineKey, ResourceType.HDD));
-
-        Map<ResourceType, Double> percentileData = Maps.newHashMap();
-        percentileData.put(ResourceType.CPU, getKthPercentile(data.get(machineKey, ResourceType.CPU)));
-        percentileData.put(ResourceType.RAM, getKthPercentile(data.get(machineKey, ResourceType.RAM)));
-        percentileData.put(ResourceType.HDD, getKthPercentile(data.get(machineKey, ResourceType.HDD)));
-        percentileOutputPort.emit(new KeyValPair<>(machineKey, percentileData));
-
-        for (ResourceType resourceType : percentileData.keySet()) {
-          double percentileValue = percentileData.get(resourceType);
-          if (percentileValue > percentileThreshold) {
-            emitAlert(resourceType, machineKey, percentileValue, "Percentile");
-          }
-        }
-      }
-    }
-    if (computeSD) {
-      for (MachineKey machineKey : data.rowKeySet()) {
-
-        Map<ResourceType, Double> sdData = Maps.newHashMap();
-
-        for (ResourceType resourceType : ResourceType.values()) {
-          sdData.put(resourceType, getSD(data.get(machineKey, resourceType)));
-        }
-        sdOutputPort.emit(new KeyValPair<>(machineKey, sdData));
-
-        for (ResourceType resourceType : sdData.keySet()) {
-          double sdValue = sdData.get(resourceType);
-          if (sdValue > sdThreshold) {
-            emitAlert(resourceType, machineKey, sdValue, "SD");
-          }
-        }
-      }
-    }
-    if (computeMax) {
-      for (MachineKey machineKey : data.rowKeySet()) {
-
-        Map<ResourceType, Integer> maxData = Maps.newHashMap();
-        maxData.put(ResourceType.CPU, Collections.max(data.get(machineKey, ResourceType.CPU)));
-        maxData.put(ResourceType.RAM, Collections.max(data.get(machineKey, ResourceType.RAM)));
-        maxData.put(ResourceType.HDD, Collections.max(data.get(machineKey, ResourceType.HDD)));
-
-        maxOutputPort.emit(new KeyValPair<>(machineKey, maxData));
-
-        for (ResourceType resourceType : maxData.keySet()) {
-          double sdValue = maxData.get(resourceType).doubleValue();
-          if (sdValue > maxThreshold) {
-            emitAlert(resourceType, machineKey, sdValue, "Max");
-          }
-        }
-      }
-    }
-    data.clear();
-  }
-
-  private void emitAlert(ResourceType type, MachineKey machineKey, double alertVal, String prefix)
-  {
-    BigDecimal decimalVal = new BigDecimal(alertVal);
-    decimalVal = decimalVal.setScale(2, BigDecimal.ROUND_HALF_UP);
-    String alertTime = machineKey.getDay() + machineKey.getTimeKey();
-    smtpAlert.emit(prefix + "-" + type.toString().toUpperCase() + " alert at " + alertTime + " " + type + " usage breached current usage: " + decimalVal.doubleValue() + "% threshold: " + percentileThreshold + "%\n\n" + machineKey);
-  }
-
-  private double getKthPercentile(List<Integer> sorted)
-  {
-
-    double val = (kthPercentile * sorted.size()) / 100.0;
-    if (val == (int)val) {
-      // Whole number
-      int idx = (int)val - 1;
-      return (sorted.get(idx) + sorted.get(idx + 1)) / 2.0;
-    } else {
-      int idx = (int)Math.round(val) - 1;
-      return sorted.get(idx);
-    }
-  }
-
-  private double getSD(List<Integer> data)
-  {
-    int sum = 0;
-    for (int i : data) {
-      sum += i;
-    }
-    double avg = sum / (data.size() * 1.0);
-    double sd = 0;
-    for (Integer point : data) {
-      sd += Math.pow(point - avg, 2);
-    }
-    return Math.sqrt(sd);
-  }
-
-  /**
-   * @param kVal the percentile which will be emitted by this operator
-   */
-  public void setKthPercentile(int kVal)
-  {
-    this.kthPercentile = kVal;
-  }
-
-  /**
-   * @param doCompute when true percentile will be computed
-   */
-  public void setComputePercentile(boolean doCompute)
-  {
-    this.computePercentile = doCompute;
-  }
-
-  /**
-   * @param doCompute when true standard deviation will be computed
-   */
-  public void setComputeSD(boolean doCompute)
-  {
-    this.computeSD = doCompute;
-  }
-
-  /**
-   * @param doCompute when true max will be computed
-   */
-  public void setComputeMax(boolean doCompute)
-  {
-    this.computeMax = doCompute;
-  }
-
-  /**
-   * @param threshold for percentile when breached will cause alert
-   */
-  public void setPercentileThreshold(int threshold)
-  {
-    this.percentileThreshold = threshold;
-  }
-
-  /**
-   * @param threshold for standard deviation when breached will cause alert
-   */
-  public void setSDThreshold(int threshold)
-  {
-    this.sdThreshold = threshold;
-  }
-
-  /**
-   * @param threshold for Max when breached will cause alert
-   */
-  public void setMaxThreshold(int threshold)
-  {
-    this.maxThreshold = threshold;
-  }
-
-  public static class MachineInfoStreamCodec extends KryoSerializableStreamCodec<MachineInfo> implements Serializable
-  {
-    public MachineInfoStreamCodec()
-    {
-      super();
-    }
-
-    @Override
-    public int getPartition(MachineInfo o)
-    {
-      return Objects.hashCode(o.getMachineKey().getCustomer(), o.getMachineKey().getOs(), o.getMachineKey().getProduct(), o.getMachineKey().getSoftware1(), o.getMachineKey().getSoftware2(), o.getMachineKey().getSoftware3());
-    }
-
-    private static final long serialVersionUID = 201411031403L;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
deleted file mode 100644
index bbfd547..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.operator;
-
-import java.math.BigDecimal;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-
-import com.datatorrent.demos.machinedata.data.AverageData;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.lib.util.KeyHashValPair;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * This class calculates the average for various resources across different devices for a given key
- * <p>MachineInfoAveragingOperator class.</p>
- *
- * @since 0.9.0
- */
-@SuppressWarnings("unused")
-public class MachineInfoAveragingOperator extends BaseOperator
-{
-
-  public static final String CPU = "cpu";
-  public static final String RAM = "ram";
-  public static final String HDD = "hdd";
-  public static final String DAY = "day";
-
-  private final transient Map<MachineKey, AverageData> dataMap = new HashMap<>();
-
-  public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<String, String>>> outputPort = new DefaultOutputPort<>();
-
-  public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>();
-
-  private int threshold = 95;
-
-  /**
-   * Buffer all the tuples as is till end window gets called
-   */
-  public final transient DefaultInputPort<KeyHashValPair<MachineKey, AverageData>> inputPort = new DefaultInputPort<KeyHashValPair<MachineKey, AverageData>>()
-  {
-
-    @Override
-    public void process(KeyHashValPair<MachineKey, AverageData> tuple)
-    {
-      addTuple(tuple);
-    }
-  };
-
-  /**
-   * This method returns the threshold value
-   *
-   * @return
-   */
-  public int getThreshold()
-  {
-    return threshold;
-  }
-
-  /**
-   * This method sets the threshold value. If the average usage for any Resource is above this for a given key, then the alert is sent
-   *
-   * @param threshold the threshold value
-   */
-  public void setThreshold(int threshold)
-  {
-    this.threshold = threshold;
-  }
-
-  /**
-   * This adds the given tuple to the dataMap
-   *
-   * @param tuple input tuple
-   */
-  private void addTuple(KeyHashValPair<MachineKey, AverageData> tuple)
-  {
-    MachineKey key = tuple.getKey();
-    dataMap.put(key, tuple.getValue());
-  }
-
-  @Override
-  public void endWindow()
-  {
-    for (Map.Entry<MachineKey, AverageData> entry : dataMap.entrySet()) {
-      MachineKey key = entry.getKey();
-      AverageData averageResultMap = entry.getValue();
-      Map<String, String> averageResult = Maps.newHashMap();
-      long count = averageResultMap.getCount();
-      double average = averageResultMap.getCpu() / count;
-      averageResult.put(CPU, average + "");
-      emitAlert(average, CPU, key);
-      average = averageResultMap.getHdd() / count;
-      averageResult.put(HDD, average + "");
-      emitAlert(average, HDD, key);
-      average = averageResultMap.getRam() / count;
-      averageResult.put(RAM, average + "");
-      emitAlert(average, RAM, key);
-      averageResult.put(DAY, key.getDay());
-      outputPort.emit(new KeyValPair<>(key, averageResult));
-    }
-    dataMap.clear();
-  }
-
-  private void emitAlert(double average, String resourceType, MachineKey key)
-  {
-    if (average > threshold) {
-      BigDecimal bd = new BigDecimal(average);
-      bd = bd.setScale(2, BigDecimal.ROUND_HALF_UP);
-      String stime = key.getDay() + key.getTimeKey();
-      String skey = getKeyInfo(key);
-      smtpAlert.emit(resourceType.toUpperCase() + " alert at " + stime + " " + resourceType + " usage breached current usage: " + bd.doubleValue() + "% threshold: " + threshold + "%\n\n" + skey);
-    }
-  }
-
-  /**
-   * This method is used to artificially generate alerts
-   *
-   * @param genAlert
-   */
-  public void setGenAlert(boolean genAlert)
-  {
-    Calendar calendar = Calendar.getInstance();
-    long timestamp = System.currentTimeMillis();
-    calendar.setTimeInMillis(timestamp);
-    DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
-    Date date = calendar.getTime();
-    String timeKey = minuteDateFormat.format(date);
-    DateFormat dayDateFormat = new SimpleDateFormat("dd");
-    String day = dayDateFormat.format(date);
-
-    MachineKey alertKey = new MachineKey(timeKey, day);
-    alertKey.setCustomer(1);
-    alertKey.setProduct(5);
-    alertKey.setOs(10);
-    alertKey.setSoftware1(12);
-    alertKey.setSoftware2(14);
-    alertKey.setSoftware3(6);
-
-    MachineInfo machineInfo = new MachineInfo();
-    machineInfo.setMachineKey(alertKey);
-    machineInfo.setCpu(threshold + 1);
-    machineInfo.setRam(threshold + 1);
-    machineInfo.setHdd(threshold + 1);
-
-    smtpAlert.emit("CPU Alert: CPU Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
-    smtpAlert.emit("RAM Alert: RAM Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
-    smtpAlert.emit("HDD Alert: HDD Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
-  }
-
-  /**
-   * This method returns the String for a given MachineKey instance
-   *
-   * @param key MachineKey instance that needs to be converted to string
-   * @return
-   */
-  private String getKeyInfo(MachineKey key)
-  {
-    StringBuilder sb = new StringBuilder();
-    if (key instanceof MachineKey) {
-      MachineKey mkey = (MachineKey)key;
-      Integer customer = mkey.getCustomer();
-      if (customer != null) {
-        sb.append("customer: " + customer + "\n");
-      }
-      Integer product = mkey.getProduct();
-      if (product != null) {
-        sb.append("product version: " + product + "\n");
-      }
-      Integer os = mkey.getOs();
-      if (os != null) {
-        sb.append("os version: " + os + "\n");
-      }
-      Integer software1 = mkey.getSoftware1();
-      if (software1 != null) {
-        sb.append("software1 version: " + software1 + "\n");
-      }
-      Integer software2 = mkey.getSoftware2();
-      if (software2 != null) {
-        sb.append("software2 version: " + software2 + "\n");
-      }
-      Integer software3 = mkey.getSoftware3();
-      if (software3 != null) {
-        sb.append("software3 version: " + software3 + "\n");
-      }
-    }
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
deleted file mode 100644
index cb5fa5a..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.demos.machinedata.data.AverageData;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.lib.util.KeyHashValPair;
-
-/**
- * This class calculates the partial sum and count for tuples generated by upstream Operator
- * <p> MachineInfoAveragingPrerequisitesOperator class. </p>
- *
- * @since 0.3.5
- */
-public class MachineInfoAveragingPrerequisitesOperator extends BaseOperator
-{
-
-  // Aggregate sum of all values seen for a key.
-  private Map<MachineKey, AverageData> sums = new HashMap<>();
-
-  public final transient DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>> outputPort = new DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>>()
-  {
-    @Override
-    public Unifier<KeyHashValPair<MachineKey, AverageData>> getUnifier()
-    {
-      MachineInfoAveragingUnifier unifier = new MachineInfoAveragingUnifier();
-      return unifier;
-    }
-  };
-
-  public transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>()
-  {
-
-    @Override
-    public void process(MachineInfo tuple)
-    {
-      MachineKey key = tuple.getMachineKey();
-      AverageData averageData = sums.get(key);
-      if (averageData == null) {
-        averageData = new AverageData(tuple.getCpu(), tuple.getHdd(), tuple.getRam(), 1);
-        sums.put(key, averageData);
-      } else {
-        averageData.setCpu(averageData.getCpu() + tuple.getCpu());
-        averageData.setRam(averageData.getRam() + tuple.getRam());
-        averageData.setHdd(averageData.getHdd() + tuple.getHdd());
-        averageData.setCount(averageData.getCount() + 1);
-      }
-    }
-  };
-
-  @Override
-  public void endWindow()
-  {
-
-    for (Map.Entry<MachineKey, AverageData> entry : sums.entrySet()) {
-      if (outputPort.isConnected()) {
-        outputPort.emit(new KeyHashValPair<>(entry.getKey(), entry.getValue()));
-      }
-    }
-    sums.clear();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
deleted file mode 100644
index e0b67f3..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator.Unifier;
-
-import com.datatorrent.demos.machinedata.data.AverageData;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.lib.util.KeyHashValPair;
-
-/**
- * This class calculates the partial sum and count for a given key
- * <p>MachineInfoAveragingUnifier class.</p>
- *
- * @since 0.9.0
- */
-public class MachineInfoAveragingUnifier implements Unifier<KeyHashValPair<MachineKey, AverageData>>
-{
-
-  private Map<MachineKey, AverageData> sums = new HashMap<>();
-  public final transient DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>> outputPort = new DefaultOutputPort<>();
-
-  @Override
-  public void beginWindow(long arg0)
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void endWindow()
-  {
-    for (Map.Entry<MachineKey, AverageData> entry : sums.entrySet()) {
-      outputPort.emit(new KeyHashValPair<>(entry.getKey(), entry.getValue()));
-    }
-    sums.clear();
-
-  }
-
-  @Override
-  public void setup(OperatorContext arg0)
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void teardown()
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void process(KeyHashValPair<MachineKey, AverageData> arg0)
-  {
-    MachineKey tupleKey = arg0.getKey();
-    AverageData averageData = sums.get(tupleKey);
-    AverageData tupleValue = arg0.getValue();
-    if (averageData == null) {
-      sums.put(tupleKey, tupleValue);
-    } else {
-      averageData.setCpu(averageData.getCpu() + tupleValue.getCpu());
-      averageData.setRam(averageData.getRam() + tupleValue.getRam());
-      averageData.setHdd(averageData.getHdd() + tupleValue.getHdd());
-      averageData.setCount(averageData.getCount() + tupleValue.getCount());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
deleted file mode 100644
index 6c4256a..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.util;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Generate combinations of elements for the given array of elements.
- *
- * Implements nCr = n! / (r! * (n-r)!)
- *
- * @since 0.3.5
- */
-public class Combinatorics<T>
-{
-
-  private T[] values;
-  private int size = -1;
-  private List<T> result;
-  private Map<Integer, List<T>> resultMap = new HashMap<Integer, List<T>>();
-  private int resultMapSize = 0;
-
-  /**
-   * Generates all possible combinations with all the sizes.
-   *
-   * @param values
-   */
-  public Combinatorics(T[] values)
-  {
-    this.values = values;
-    this.size = -1;
-    this.result = new ArrayList<>();
-  }
-
-  /**
-   * Generates all possible combinations with the given size.
-   *
-   * @param values
-   * @param size
-   */
-  public Combinatorics(T[] values, int size)
-  {
-    this.values = values;
-    this.size = size;
-    this.result = new ArrayList<>();
-  }
-
-  public Map<Integer, List<T>> generate()
-  {
-
-    if (size == -1) {
-      size = values.length;
-      for (int i = 1; i <= size; i++) {
-        int[] tmp = new int[i];
-        Arrays.fill(tmp, -1);
-        generateCombinations(0, 0, tmp);
-      }
-    } else {
-      int[] tmp = new int[size];
-      Arrays.fill(tmp, -1);
-      generateCombinations(0, 0, tmp);
-    }
-    return resultMap;
-  }
-
-  public void generateCombinations(int start, int depth, int[] tmp)
-  {
-    if (depth == tmp.length) {
-      for (int j = 0; j < depth; j++) {
-        result.add(values[tmp[j]]);
-      }
-      resultMap.put(++resultMapSize, result);
-      result = new ArrayList<>();
-      return;
-    }
-    for (int i = start; i < values.length; i++) {
-      tmp[depth] = i;
-      generateCombinations(i + 1, depth + 1, tmp);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
deleted file mode 100644
index f8f2d33..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.util;
-
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import com.google.common.collect.Maps;
-
-/**
- * <p>DataTable class.</p>
- *
- * @since 0.3.5
- */
-public class DataTable<R,C,E>
-{
-
-  //machineKey, [cpu,ram,hdd] -> value
-  private final Map<R,Map<C,E>> table = Maps.newHashMap();
-
-  public boolean containsRow(R rowKey)
-  {
-    return table.containsKey(rowKey);
-  }
-
-  public void put(R rowKey,C colKey, E entry)
-  {
-    if (!containsRow(rowKey)) {
-      table.put(rowKey, Maps.<C,E>newHashMap());
-    }
-    table.get(rowKey).put(colKey, entry);
-  }
-
-  @Nullable
-  public E get(R rowKey, C colKey)
-  {
-    if (!containsRow(rowKey)) {
-      return null;
-    }
-    return table.get(rowKey).get(colKey);
-  }
-
-  public Set<R> rowKeySet()
-  {
-    return table.keySet();
-  }
-
-  public void clear()
-  {
-    table.clear();
-  }
-
-  public Map<R,Map<C,E>> getTable()
-  {
-    return table;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/resources/META-INF/properties.xml b/demos/machinedata/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index afe8783..0000000
--- a/demos/machinedata/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,139 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-  <property>
-    <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
-    <value>1000</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Receiver.attr.APPLICATION_WINDOW_COUNT
-    </name>
-    <value>5</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Receiver.attr.PARTITIONER</name>
-    <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.DimensionsGenerator.attr.APPLICATION_WINDOW_COUNT
-    </name>
-    <value>5</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.stream.Events.locality
-    </name>
-    <value>CONTAINER_LOCAL</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.DimensionsGenerator.inputport.inputPort.attr.PARTITION_PARALLEL
-    </name>
-    <value>true</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Aggregator.inputport.inputPort.attr.PARTITION_PARALLEL
-    </name>
-    <value>true</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Aggregator.attr.APPLICATION_WINDOW_COUNT
-    </name>
-    <value>5</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.AverageCalculator.attr.APPLICATION_WINDOW_COUNT
-    </name>
-    <value>5</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Persister.inputport.input.attr.PARTITION_PARALLEL
-    </name>
-    <value>true</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Persister.attr.APPLICATION_WINDOW_COUNT
-    </name>
-    <value>5</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Persister.store.dbIndex
-    </name>
-    <value>2</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Persister.store.host
-    </name>
-    <value></value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Persister.store.port
-    </name>
-    <value>6379</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.port.*.attr.QUEUE_CAPACITY</name>
-    <value>32000</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Alerter.from
-    </name>
-    <value></value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Alerter.subject
-    </name>
-    <value>Alert!!!</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Alerter.recipients.TO
-    </name>
-    <value></value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Alerter.content
-    </name>
-    <value>{}</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Alerter.smtpHost
-    </name>
-    <value>localhost</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Alerter.smtpPort
-    </name>
-    <value>25</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Alerter.useSsl
-    </name>
-    <value>false</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.operator.Aggregator.outputport.outputPort.attr.UNIFIER_LIMIT
-    </name>
-    <value>8</value>
-  </property>
-  <property>
-    <name>dt.application.MachineDataDemo.stream.DimensionalData.locality
-    </name>
-    <value>THREAD_LOCAL</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/site/conf/my-app-conf1.xml
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/site/conf/my-app-conf1.xml b/demos/machinedata/src/site/conf/my-app-conf1.xml
deleted file mode 100644
index f35873b..0000000
--- a/demos/machinedata/src/site/conf/my-app-conf1.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-  <property>
-    <name>dt.attr.MASTER_MEMORY_MB</name>
-    <value>1024</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java b/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
deleted file mode 100644
index 0e397be..0000000
--- a/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.demos.machinedata.data.ResourceType;
-import com.datatorrent.demos.machinedata.operator.CalculatorOperator;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.KeyValPair;
-import com.datatorrent.lib.util.TimeBucketKey;
-
-/**
- * @since 0.3.5
- */
-public class CalculatorOperatorTest
-{
-  private static DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
-  private static Logger LOG = LoggerFactory.getLogger(CalculatorOperatorTest.class);
-
-  /**
-   * Test node logic emits correct results
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    CalculatorOperator calculatorOperator = new CalculatorOperator();
-    calculatorOperator.setup(null);
-
-    calculatorOperator.setComputePercentile(true);
-    calculatorOperator.setComputeMax(true);
-    calculatorOperator.setComputeSD(true);
-
-    testPercentile(calculatorOperator);
-  }
-
-  public void testPercentile(CalculatorOperator oper)
-  {
-
-    CollectorTestSink sortSink = new CollectorTestSink();
-    oper.percentileOutputPort.setSink(sortSink);
-    oper.setKthPercentile(50);
-    Calendar calendar = Calendar.getInstance();
-    Date date = calendar.getTime();
-    String timeKey = minuteDateFormat.format(date);
-    String day = calendar.get(Calendar.DAY_OF_MONTH) + "";
-
-    Integer vs = new Integer(1);
-    MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs);
-
-    oper.beginWindow(0);
-
-    MachineInfo info = new MachineInfo(mk, 1, 1, 1);
-    oper.dataPort.process(info);
-
-    info.setCpu(2);
-    oper.dataPort.process(info);
-
-    info.setCpu(3);
-    oper.dataPort.process(info);
-
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    for (Object o : sortSink.collectedTuples) {
-      LOG.debug(o.toString());
-      KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
-      Assert.assertEquals("emitted value for 'cpu' was ", 2.0, keyValPair.getValue().get(ResourceType.CPU), 0);
-      Assert.assertEquals("emitted value for 'hdd' was ", 1.0, keyValPair.getValue().get(ResourceType.HDD), 0);
-      Assert.assertEquals("emitted value for 'ram' was ", 1.0, keyValPair.getValue().get(ResourceType.RAM), 0);
-
-    }
-    LOG.debug("Done percentile testing\n");
-
-  }
-
-  public void testStandarDeviation(CalculatorOperator oper)
-  {
-    CollectorTestSink sortSink = new CollectorTestSink();
-    oper.sdOutputPort.setSink(sortSink);
-    Calendar calendar = Calendar.getInstance();
-    Date date = calendar.getTime();
-    String timeKey = minuteDateFormat.format(date);
-    String day = calendar.get(Calendar.DAY_OF_MONTH) + "";
-
-    Integer vs = new Integer(1);
-    MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs);
-
-    oper.beginWindow(0);
-
-    MachineInfo info = new MachineInfo(mk, 1, 1, 1);
-    oper.dataPort.process(info);
-
-    info.setCpu(2);
-    oper.dataPort.process(info);
-
-    info.setCpu(3);
-    oper.dataPort.process(info);
-
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    for (Object o : sortSink.collectedTuples) {
-      LOG.debug(o.toString());
-      KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
-      Assert.assertEquals("emitted value for 'cpu' was ", getSD(ImmutableList.of(1, 2, 3)), keyValPair.getValue().get(ResourceType.CPU), 0);
-      Assert.assertEquals("emitted value for 'hdd' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.HDD), 0);
-      Assert.assertEquals("emitted value for 'ram' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.RAM), 0);
-
-    }
-    LOG.debug("Done sd testing\n");
-
-  }
-
-  private final double getSD(List<Integer> input)
-  {
-    int sum = 0;
-    for (int i : input) {
-      sum += i;
-    }
-    double avg = sum / (input.size() * 1.0);
-    double sd = 0;
-    for (Integer point : input) {
-      sd += Math.pow(point - avg, 2);
-    }
-    return Math.sqrt(sd);
-  }
-
-  public void testMax(CalculatorOperator oper)
-  {
-    CollectorTestSink sortSink = new CollectorTestSink();
-    oper.maxOutputPort.setSink(sortSink);
-    Calendar calendar = Calendar.getInstance();
-    Date date = calendar.getTime();
-    String timeKey = minuteDateFormat.format(date);
-    String day = calendar.get(Calendar.DAY_OF_MONTH) + "";
-
-    Integer vs = new Integer(1);
-    MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs);
-
-    oper.beginWindow(0);
-
-    MachineInfo info = new MachineInfo(mk, 1, 1, 1);
-    oper.dataPort.process(info);
-
-    info.setCpu(2);
-    oper.dataPort.process(info);
-
-    info.setCpu(3);
-    oper.dataPort.process(info);
-
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
-    for (Object o : sortSink.collectedTuples) {
-      LOG.debug(o.toString());
-      KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
-      Assert.assertEquals("emitted value for 'cpu' was ", 3, keyValPair.getValue().get(ResourceType.CPU), 0);
-      Assert.assertEquals("emitted value for 'hdd' was ", 1, keyValPair.getValue().get(ResourceType.HDD), 0);
-      Assert.assertEquals("emitted value for 'ram' was ", 1, keyValPair.getValue().get(ResourceType.RAM), 0);
-
-    }
-    LOG.debug("Done max testing\n");
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/test/resources/log4j.properties b/demos/machinedata/src/test/resources/log4j.properties
deleted file mode 100644
index cf0d19e..0000000
--- a/demos/machinedata/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=DEBUG
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=debug
-log4j.logger.org.apache.apex=debug