You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:13 UTC
[08/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/Application.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/Application.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/Application.java
new file mode 100644
index 0000000..8132c7d
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/Application.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.examples.machinedata.data.MachineKey;
+import org.apache.apex.examples.machinedata.operator.MachineInfoAveragingOperator;
+import org.apache.apex.examples.machinedata.operator.MachineInfoAveragingPrerequisitesOperator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
+import com.datatorrent.lib.io.SmtpOutputOperator;
+
+/**
+ * <p>
+ * Resource monitor application.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+@ApplicationAnnotation(name = "MachineDataExample")
+@SuppressWarnings("unused")
+public class Application implements StreamingApplication
+{
+
+ private static final Logger LOG = LoggerFactory.getLogger(Application.class);
+
+ /**
+ * This function sets up the DAG for calculating the average
+ *
+ * @param dag the DAG instance
+ * @param conf the configuration instance
+ * @return MachineInfoAveragingPrerequisitesOperator
+ */
+ private MachineInfoAveragingPrerequisitesOperator addAverageCalculation(DAG dag, Configuration conf)
+ {
+ MachineInfoAveragingPrerequisitesOperator prereqAverageOper = dag.addOperator("Aggregator", MachineInfoAveragingPrerequisitesOperator.class);
+ MachineInfoAveragingOperator averageOperator = dag.addOperator("AverageCalculator", MachineInfoAveragingOperator.class);
+ RedisKeyValPairOutputOperator<MachineKey, Map<String, String>> redisAvgOperator = dag.addOperator("Persister", new RedisKeyValPairOutputOperator<MachineKey, Map<String, String>>());
+ dag.addStream("Average", averageOperator.outputPort, redisAvgOperator.input);
+ SmtpOutputOperator smtpOutputOperator = dag.addOperator("Alerter", new SmtpOutputOperator());
+ dag.addStream("Aggregates", prereqAverageOper.outputPort, averageOperator.inputPort);
+ dag.addStream("Alerts", averageOperator.smtpAlert, smtpOutputOperator.input);
+ return prereqAverageOper;
+ }
+
+ /**
+ * Create the DAG
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ InputReceiver randomGen = dag.addOperator("Receiver", InputReceiver.class);
+ DimensionGenerator dimensionGenerator = dag.addOperator("DimensionsGenerator", DimensionGenerator.class);
+ dag.addStream("Events", randomGen.outputInline, dimensionGenerator.inputPort);
+ MachineInfoAveragingPrerequisitesOperator prereqAverageOper = addAverageCalculation(dag, conf);
+ dag.addStream("DimensionalData", dimensionGenerator.outputInline, prereqAverageOper.inputPort);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/DimensionGenerator.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/DimensionGenerator.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/DimensionGenerator.java
new file mode 100644
index 0000000..6c717c2
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/DimensionGenerator.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata;
+
+import org.apache.apex.examples.machinedata.data.MachineInfo;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+
+/**
+ * <p>
+ * Information tuple generator with randomness.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+@SuppressWarnings("unused")
+public class DimensionGenerator extends BaseOperator
+{
+ public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>();
+ public transient DefaultOutputPort<MachineInfo> output = new DefaultOutputPort<>();
+ private int threshold = 90;
+
+ public final transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>()
+ {
+
+ @Override
+ public void process(MachineInfo tuple)
+ {
+ emitDimensions(tuple);
+ }
+
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ }
+
+ /**
+ * This returns the threshold value set
+ * @return
+ */
+ public int getThreshold()
+ {
+ return threshold;
+ }
+
+ /**
+ * This function sets the threshold value. This value is used to check the maximum value for cpu/ram/hdd
+ * @param threshold
+ */
+ public void setThreshold(int threshold)
+ {
+ this.threshold = threshold;
+ }
+
+ /**
+ * This function takes in the tuple from upstream operator and generates tuples with different dimension combinations
+ *
+ * @param tuple
+ */
+ private void emitDimensions(MachineInfo tuple)
+ {
+ MachineKey tupleKey = tuple.getMachineKey();
+
+ for (int i = 0; i < 64; i++) {
+ MachineKey machineKey = new MachineKey(tupleKey.getTimeKey(),tupleKey.getDay());
+ if ((i & 1) != 0) {
+ machineKey.setCustomer(tupleKey.getCustomer());
+ }
+ if ((i & 2) != 0) {
+ machineKey.setProduct(tupleKey.getProduct());
+ }
+ if ((i & 4) != 0) {
+ machineKey.setOs(tupleKey.getOs());
+ }
+ if ((i & 8) != 0) {
+ machineKey.setDeviceId(tupleKey.getDeviceId());
+ }
+ if ((i & 16) != 0) {
+ machineKey.setSoftware1(tupleKey.getSoftware1());
+ }
+ if ((i & 32) != 0) {
+ machineKey.setSoftware2(tupleKey.getSoftware2());
+ }
+
+ int cpu = tuple.getCpu();
+ int ram = tuple.getRam();
+ int hdd = tuple.getHdd();
+ MachineInfo machineInfo = new MachineInfo();
+ machineInfo.setMachineKey(machineKey);
+ machineInfo.setCpu((cpu < threshold) ? cpu : threshold);
+ machineInfo.setRam((ram < threshold) ? ram : threshold);
+ machineInfo.setHdd((hdd < threshold) ? hdd : threshold);
+ outputInline.emit(machineInfo);
+ output.emit(machineInfo);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/InputReceiver.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/InputReceiver.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/InputReceiver.java
new file mode 100644
index 0000000..263db55
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/InputReceiver.java
@@ -0,0 +1,524 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Random;
+import java.util.TimeZone;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.examples.machinedata.data.MachineInfo;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * <p>
+ * Information tuple generator with randomness.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+@SuppressWarnings("unused")
+public class InputReceiver extends BaseOperator implements InputOperator
+{
+ private static final Logger logger = LoggerFactory.getLogger(InputReceiver.class);
+
+ public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>();
+ private final Random randomGen = new Random();
+
+ private int customerMin = 1;
+ private int customerMax = 5;
+ private int productMin = 4;
+ private int productMax = 6;
+ private int osMin = 10;
+ private int osMax = 12;
+ private int software1Min = 10;
+ private int software1Max = 12;
+ private int software2Min = 12;
+ private int software2Max = 14;
+ private int software3Min = 4;
+ private int software3Max = 6;
+
+ private int deviceIdMin = 1;
+ private int deviceIdMax = 50;
+
+ private int tupleBlastSize = 1001;
+
+ private static final DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
+ private static final DateFormat dayDateFormat = new SimpleDateFormat("d");
+
+ static {
+ TimeZone tz = TimeZone.getTimeZone("GMT");
+ minuteDateFormat.setTimeZone(tz);
+ dayDateFormat.setTimeZone(tz);
+
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ int count = 0;
+ Calendar calendar = Calendar.getInstance();
+ Date date = calendar.getTime();
+ String timeKey = minuteDateFormat.format(date);
+ String day = dayDateFormat.format(date);
+
+ while (count < tupleBlastSize) {
+ randomGen.setSeed(System.currentTimeMillis());
+
+ int customerVal = genCustomerId();
+ int productVal = genProductVer();
+ int osVal = genOsVer();
+ int software1Val = genSoftware1Ver();
+ int software2Val = genSoftware2Ver();
+ int software3Val = genSoftware3Ver();
+ int deviceIdVal = genDeviceId();
+
+ int cpuVal = genCpu(calendar);
+ int ramVal = genRam(calendar);
+ int hddVal = genHdd(calendar);
+
+ MachineKey machineKey = new MachineKey(timeKey, day);
+
+ machineKey.setCustomer(customerVal);
+ machineKey.setProduct(productVal);
+ machineKey.setOs(osVal);
+ machineKey.setDeviceId(deviceIdVal);
+ machineKey.setSoftware1(software1Val);
+ machineKey.setSoftware2(software2Val);
+ machineKey.setSoftware3(software3Val);
+ MachineInfo machineInfo = new MachineInfo();
+ machineInfo.setMachineKey(machineKey);
+ machineInfo.setCpu(cpuVal);
+ machineInfo.setRam(ramVal);
+ machineInfo.setHdd(hddVal);
+
+ outputInline.emit(machineInfo);
+
+ count++;
+ }
+ }
+
+ private int genCustomerId()
+ {
+ int range = customerMax - customerMin + 1;
+ return customerMin + randomGen.nextInt(range);
+ }
+
+ private int genProductVer()
+ {
+ int range = productMax - productMin + 1;
+ return productMin + randomGen.nextInt(range);
+ }
+
+ private int genOsVer()
+ {
+ int range = osMax - osMin + 1;
+ return osMin + randomGen.nextInt(range);
+ }
+
+ private int genSoftware3Ver()
+ {
+ int range = software3Max - software3Min + 1;
+ return software3Min + randomGen.nextInt(range);
+ }
+
+ private int genDeviceId()
+ {
+ int range = deviceIdMax - deviceIdMin + 1;
+ return deviceIdMin + randomGen.nextInt(range);
+ }
+
+ private int genSoftware1Ver()
+ {
+ int range = software1Max - software1Min + 1;
+ return software1Min + randomGen.nextInt(range);
+ }
+
+ private int genSoftware2Ver()
+ {
+ int range = software2Max - software2Min + 1;
+ return software2Min + randomGen.nextInt(range);
+ }
+
+ private int genCpu(Calendar cal)
+ {
+ int minute = cal.get(Calendar.MINUTE);
+ int second;
+ int range = minute / 2 + 19;
+ if (minute / 17 == 0) {
+ second = cal.get(Calendar.SECOND);
+ return (30 + randomGen.nextInt(range) + (minute % 7) - (second % 11));
+ } else if (minute / 47 == 0) {
+ second = cal.get(Calendar.SECOND);
+ return (7 + randomGen.nextInt(range) + (minute % 7) - (second % 7));
+ } else {
+ second = cal.get(Calendar.SECOND);
+ return (randomGen.nextInt(range) + (minute % 19) + (second % 7));
+ }
+ }
+
+ private int genRam(Calendar cal)
+ {
+ int minute = cal.get(Calendar.MINUTE);
+ int second;
+ int range = minute + 1;
+ if (minute / 23 == 0) {
+ second = cal.get(Calendar.SECOND);
+ return (20 + randomGen.nextInt(range) + (minute % 5) - (second % 11));
+ } else if (minute / 37 == 0) {
+ second = cal.get(Calendar.SECOND);
+ return (11 + randomGen.nextInt(60) - (minute % 5) - (second % 11));
+ } else {
+ second = cal.get(Calendar.SECOND);
+ return (randomGen.nextInt(range) + (minute % 17) + (second % 11));
+ }
+ }
+
+ private int genHdd(Calendar cal)
+ {
+ int minute = cal.get(Calendar.MINUTE);
+ int second;
+ int range = minute / 2 + 1;
+ if (minute / 37 == 0) {
+ second = cal.get(Calendar.SECOND);
+ return (25 + randomGen.nextInt(range) - minute % 7 - second % 11);
+ } else {
+ second = cal.get(Calendar.SECOND);
+ return (randomGen.nextInt(range) + minute % 23 + second % 11);
+ }
+ }
+
+ /**
+ * This method returns the minimum value for customer
+ *
+ * @return
+ */
+ public int getCustomerMin()
+ {
+ return customerMin;
+ }
+
+ /**
+ * This method is used to set the minimum value for customer
+ *
+ * @param customerMin the minimum customer value
+ */
+ public void setCustomerMin(int customerMin)
+ {
+ this.customerMin = customerMin;
+ }
+
+ /**
+ * This method returns the max value for customer
+ *
+ * @return
+ */
+ public int getCustomerMax()
+ {
+ return customerMax;
+ }
+
+ /**
+ * This method is used to set the max value for customer
+ *
+ * @param customerMax the max customer value
+ */
+ public void setCustomerMax(int customerMax)
+ {
+ this.customerMax = customerMax;
+ }
+
+ /**
+ * This method returns the minimum value for product
+ *
+ * @return
+ */
+ public int getProductMin()
+ {
+ return productMin;
+ }
+
+ /**
+ * This method is used to set the minimum value for product
+ *
+ * @param productMin the minimum product value
+ */
+ public void setProductMin(int productMin)
+ {
+ this.productMin = productMin;
+ }
+
+ /**
+ * This method returns the max value for product
+ *
+ * @return
+ */
+ public int getProductMax()
+ {
+ return productMax;
+ }
+
+ /**
+ * This method is used to set the max value for product
+ *
+ * @param productMax the max product value
+ */
+ public void setProductMax(int productMax)
+ {
+ this.productMax = productMax;
+ }
+
+ /**
+ * This method returns the minimum value for OS
+ *
+ * @return
+ */
+ public int getOsMin()
+ {
+ return osMin;
+ }
+
+ /**
+ * This method is used to set the minimum value for OS
+ *
+ * @param osMin the min OS value
+ */
+ public void setOsMin(int osMin)
+ {
+ this.osMin = osMin;
+ }
+
+ /**
+ * This method returns the max value for OS
+ *
+ * @return
+ */
+ public int getOsMax()
+ {
+ return osMax;
+ }
+
+ /**
+ * This method is used to set the max value for OS
+ *
+ * @param osMax the max OS value
+ */
+ public void setOsMax(int osMax)
+ {
+ this.osMax = osMax;
+ }
+
+ /**
+ * This method returns the minimum value for software1
+ *
+ * @return
+ */
+ public int getSoftware1Min()
+ {
+ return software1Min;
+ }
+
+ /**
+ * This method is used to set the minimum value for software1
+ *
+ * @param software1Min the minimum software1 value
+ */
+ public void setSoftware1Min(int software1Min)
+ {
+ this.software1Min = software1Min;
+ }
+
+ /**
+ * This method returns the max value for software1
+ *
+ * @return
+ */
+ public int getSoftware1Max()
+ {
+ return software1Max;
+ }
+
+ /**
+ * This method is used to set the max value for software1
+ *
+ * @param software1Max the max software1 value
+ */
+ public void setSoftware1Max(int software1Max)
+ {
+ this.software1Max = software1Max;
+ }
+
+ /**
+ * This method returns the minimum value for software2
+ *
+ * @return
+ */
+ public int getSoftware2Min()
+ {
+ return software2Min;
+ }
+
+ /**
+ * This method is used to set the minimum value for software2
+ *
+ * @param software2Min the minimum software2 value
+ */
+ public void setSoftware2Min(int software2Min)
+ {
+ this.software2Min = software2Min;
+ }
+
+ /**
+ * This method returns the max value for software2
+ *
+ * @return
+ */
+ public int getSoftware2Max()
+ {
+ return software2Max;
+ }
+
+ /**
+ * This method is used to set the max value for software2
+ *
+ * @param software2Max the max software2 value
+ */
+ public void setSoftware2Max(int software2Max)
+ {
+ this.software2Max = software2Max;
+ }
+
+ /**
+ * This method returns the minimum value for software3
+ *
+ * @return
+ */
+ public int getSoftware3Min()
+ {
+ return software3Min;
+ }
+
+ /**
+ * This method is used to set the minimum value for software3
+ *
+ * @param software3Min the minimum software3 value
+ */
+ public void setSoftware3Min(int software3Min)
+ {
+ this.software3Min = software3Min;
+ }
+
+ /**
+ * This method returns the max value for software3
+ *
+ * @return
+ */
+ public int getSoftware3Max()
+ {
+ return software3Max;
+ }
+
+ /**
+ * This method is used to set the max value for software3
+ *
+ * @param software3Max the max software3 value
+ */
+ public void setSoftware3Max(int software3Max)
+ {
+ this.software3Max = software3Max;
+ }
+
+ /**
+ * This method returns the minimum value for deviceId
+ *
+ * @return
+ */
+ public int getDeviceIdMin()
+ {
+ return deviceIdMin;
+ }
+
+ /**
+ * This method is used to set the minimum value for deviceId
+ *
+ * @param deviceIdMin the minimum deviceId value
+ */
+ public void setDeviceIdMin(int deviceIdMin)
+ {
+ this.deviceIdMin = deviceIdMin;
+ }
+
+ /**
+ * This method returns the max value for deviceId
+ *
+ * @return
+ */
+ public int getDeviceIdMax()
+ {
+ return deviceIdMax;
+ }
+
+ /**
+ * This method is used to set the max value for deviceId
+ *
+ * @param deviceIdMax the max deviceId value
+ */
+ public void setDeviceIdMax(int deviceIdMax)
+ {
+ this.deviceIdMax = deviceIdMax;
+ }
+
+ /**
+ * @return the tupleBlastSize
+ */
+ public int getTupleBlastSize()
+ {
+ return tupleBlastSize;
+ }
+
+ /**
+ * @param tupleBlastSize the tupleBlastSize to set
+ */
+ public void setTupleBlastSize(int tupleBlastSize)
+ {
+ this.tupleBlastSize = tupleBlastSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/AverageData.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/AverageData.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/AverageData.java
new file mode 100644
index 0000000..3c12335
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/AverageData.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.data;
+
+
+/**
+ * This class stores the value of sum and the count of values summed.
+ * <p>
+ * AverageData class.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+public class AverageData
+{
+
+ private long cpu;
+ private long hdd;
+ private long ram;
+ private long count;
+
+ /**
+ * This is default constructor that sets the sum and count to 0
+ */
+ public AverageData()
+ {
+
+ }
+
+ /**
+ * This constructor takes the value of sum and count and initialize the local attributes to corresponding values
+ *
+ * @param count
+ * the value of count
+ */
+ public AverageData(long cpu,long hdd,long ram, long count)
+ {
+ this.cpu = cpu;
+ this.ram = ram;
+ this.hdd = hdd;
+ this.count = count;
+ }
+
+ public long getCpu()
+ {
+ return cpu;
+ }
+
+ public void setCpu(long cpu)
+ {
+ this.cpu = cpu;
+ }
+
+ public long getHdd()
+ {
+ return hdd;
+ }
+
+ public void setHdd(long hdd)
+ {
+ this.hdd = hdd;
+ }
+
+ public long getRam()
+ {
+ return ram;
+ }
+
+ public void setRam(long ram)
+ {
+ this.ram = ram;
+ }
+
+ /**
+ * This returns the value of count
+ * @return
+ */
+ public long getCount()
+ {
+ return count;
+ }
+
+ /**
+ * This method sets the value of count
+ * @param count
+ */
+ public void setCount(long count)
+ {
+ this.count = count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineInfo.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineInfo.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineInfo.java
new file mode 100644
index 0000000..3952b70
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineInfo.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.data;
+
+/**
+ * This class stores the cpu% usage, ram% usage, hdd% usage and key information about a particular machine
+ * <p>
+ * MachineInfo class.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+public class MachineInfo
+{
+ private MachineKey machineKey;
+ private int cpu;
+ private int ram;
+ private int hdd;
+
+ /**
+ * This default constructor
+ */
+ public MachineInfo()
+ {
+ }
+
+ /**
+ * This constructor takes MachineKey as input and initialize local attributes
+ *
+ * @param machineKey
+ * the MachineKey instance
+ */
+ public MachineInfo(MachineKey machineKey)
+ {
+ this.machineKey = machineKey;
+ }
+
+ /**
+ * This constructor takes MachineKey, cpu usage, ram usage, hdd usage as input and initialize local attributes
+ *
+ * @param machineKey
+ * the MachineKey instance
+ * @param cpu
+ * the CPU% usage
+ * @param ram
+ * the RAM% usage
+ * @param hdd
+ * the HDD% usage
+ */
+ public MachineInfo(MachineKey machineKey, int cpu, int ram, int hdd)
+ {
+ this.machineKey = machineKey;
+ this.cpu = cpu;
+ this.ram = ram;
+ this.hdd = hdd;
+ }
+
+ /**
+ * This method returns the MachineKey
+ *
+ * @return
+ */
+ public MachineKey getMachineKey()
+ {
+ return machineKey;
+ }
+
+ /**
+ * This method sets the MachineKey
+ *
+ * @param machineKey
+ * the MachineKey instance
+ */
+ public void setMachineKey(MachineKey machineKey)
+ {
+ this.machineKey = machineKey;
+ }
+
+ /**
+ * This method returns the CPU% usage
+ *
+ * @return
+ */
+ public int getCpu()
+ {
+ return cpu;
+ }
+
+ /**
+ * This method sets the CPU% usage
+ *
+ * @param cpu
+ * the CPU% usage
+ */
+ public void setCpu(int cpu)
+ {
+ this.cpu = cpu;
+ }
+
+ /**
+ * This method returns the RAM% usage
+ *
+ * @return
+ */
+ public int getRam()
+ {
+ return ram;
+ }
+
+ /**
+ * This method sets the RAM% usage
+ *
+ * @param ram
+ * the RAM% usage
+ */
+ public void setRam(int ram)
+ {
+ this.ram = ram;
+ }
+
+ /**
+ * This method returns the HDD% usage
+ *
+ * @return
+ */
+ public int getHdd()
+ {
+ return hdd;
+ }
+
+ /**
+ * This method sets the HDD% usage
+ *
+ * @param hdd
+ * the HDD% usage
+ */
+ public void setHdd(int hdd)
+ {
+ this.hdd = hdd;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineKey.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineKey.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineKey.java
new file mode 100644
index 0000000..2bf0a53
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/MachineKey.java
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.data;
+
+/**
+ * This class stores the information about various softwares, deviceIds, OS of the device
+ * <p>
+ * MachineKey class.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+public class MachineKey
+{
+
+ private Integer customer;
+ private Integer product;
+ private Integer os;
+ private Integer software1;
+ private Integer software2;
+ private Integer software3;
+ private Integer deviceId;
+ private String timeKey;
+ private String day;
+
+ /**
+ * This constructor takes the format in which time has to be captured and the day when this instance is created
+ *
+ * @param timeKey the format in which time has to be captured
+ * @param day the day when this instance is created
+ */
+ public MachineKey(String timeKey, String day)
+ {
+ this.timeKey = timeKey;
+ this.day = day;
+ }
+
+ /**
+ * This is default constructor
+ */
+ public MachineKey()
+ {
+ }
+
+ /**
+ * This constructor takes format in which time has to be captured, the day when this instance is created, the customer
+ * id, product Id on the device, OS version on the device, software1 version on the device, software2 version on the device,
+ * software3 version on the device, deviceId on the device,
+ *
+ * @param timeKey the format in which time has to be captured
+ * @param day the day when this instance is created
+ * @param customer the customer Id
+ * @param product product Id
+ * @param os OS version
+ * @param software1 software1 version
+ * @param software2 software2 version
+ * @param software3 software3 version
+ * @param deviceId deviceId
+ */
+ public MachineKey(String timeKey, String day, Integer customer, Integer product, Integer os, Integer software1, Integer software2, Integer software3, Integer deviceId)
+ {
+ this.timeKey = timeKey;
+ this.day = day;
+ this.customer = customer;
+ this.product = product;
+ this.os = os;
+ this.software1 = software1;
+ this.software2 = software2;
+ this.software3 = software3;
+ this.deviceId = deviceId;
+ }
+
+ /**
+ * This method returns the format in which the time is captured. The time is the time when this instance of MachineKey
+ * was generated. For e.g. HHmm to capture Hour and minute
+ *
+ * @return
+ */
+ public String getTimeKey()
+ {
+ return timeKey;
+ }
+
+ /**
+ * This method sets the format in which the time is captured. The time is the time when this instance of MachineKey
+ * was generated. For e.g. HHmm to capture Hour and minute
+ *
+ * @param timeKey
+ * the value of format
+ */
+ public void setTimeKey(String timeKey)
+ {
+ this.timeKey = timeKey;
+ }
+
+ /**
+ * This method returns the day of the month when this instance of MachineKey was generated
+ *
+ * @return
+ */
+ public String getDay()
+ {
+ return day;
+ }
+
+ /**
+ * This method sets the day of the month when this instance of MachineKey was generated
+ *
+ * @param day
+ * the day of the month
+ */
+ public void setDay(String day)
+ {
+ this.day = day;
+ }
+
+ /**
+ * This method returns the customer Id
+ *
+ * @return
+ */
+ public Integer getCustomer()
+ {
+ return customer;
+ }
+
+ /**
+ * This method sets the customer Id
+ *
+ * @param customer
+ * the customer Id
+ */
+ public void setCustomer(Integer customer)
+ {
+ this.customer = customer;
+ }
+
+ /**
+ * This method returns product on the device
+ *
+ * @return
+ */
+ public Integer getProduct()
+ {
+ return product;
+ }
+
+ /**
+ * This method sets the product on the device
+ *
+ * @param product
+ * the value of product
+ */
+ public void setProduct(Integer product)
+ {
+ this.product = product;
+ }
+
+ /**
+ * This method returns the OS version on the device
+ *
+ * @return
+ */
+ public Integer getOs()
+ {
+ return os;
+ }
+
+ /**
+ * This method sets the OS version on the device
+ *
+ * @param os
+ * OS version
+ */
+ public void setOs(Integer os)
+ {
+ this.os = os;
+ }
+
+ /**
+ * This method returns the version of the software1 on the device
+ *
+ * @return
+ */
+ public Integer getSoftware1()
+ {
+ return software1;
+ }
+
+ /**
+ * This method sets the version of the software1 on the device
+ *
+ * @param software1 the version of the software1
+ */
+ public void setSoftware1(Integer software1)
+ {
+ this.software1 = software1;
+ }
+
+ /**
+ * This method returns the version of the software2 on the device
+ *
+ * @return
+ */
+ public Integer getSoftware2()
+ {
+ return software2;
+ }
+
+ /**
+ * This method sets the version of the software2 on the device
+ *
+ * @param software2
+ * the version of the software2
+ */
+ public void setSoftware2(Integer software2)
+ {
+ this.software2 = software2;
+ }
+
+ /**
+ * This method returns the version of the software3 on the device
+ *
+ * @return
+ */
+ public Integer getSoftware3()
+ {
+ return software3;
+ }
+
+ /**
+ * This method sets the version of the software3 on the device
+ *
+ * @param software3
+ * the version of the software3
+ */
+ public void setSoftware3(Integer software3)
+ {
+ this.software3 = software3;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int key = 0;
+ if (customer != null) {
+ key |= (1 << 31);
+ key ^= customer;
+ }
+ if (product != null) {
+ key |= (1 << 30);
+ key ^= product;
+ }
+ if (os != null) {
+ key |= (1 << 29);
+ key ^= os;
+ }
+ if (software1 != null) {
+ key |= (1 << 28);
+ key ^= software1;
+ }
+ if (software2 != null) {
+ key |= (1 << 27);
+ key ^= software2;
+ }
+ if (software3 != null) {
+ key |= (1 << 26);
+ key ^= software3;
+ }
+ if (deviceId != null) {
+ key |= (1 << 25);
+ key ^= deviceId;
+ }
+ if (timeKey != null) {
+ key |= (1 << 24);
+ key ^= timeKey.hashCode();
+ }
+ if (day != null) {
+ key |= (1 << 23);
+ key ^= day.hashCode();
+ }
+
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (!(obj instanceof MachineKey)) {
+ return false;
+ }
+ MachineKey mkey = (MachineKey)obj;
+ return checkStringEqual(this.timeKey, mkey.timeKey) && checkStringEqual(this.day, mkey.day) && checkIntEqual(this.customer, mkey.customer) && checkIntEqual(this.product, mkey.product) && checkIntEqual(this.os, mkey.os) && checkIntEqual(this.software1, mkey.software1) && checkIntEqual(this.software2, mkey.software2) && checkIntEqual(this.software3, mkey.software3) && checkIntEqual(this.deviceId, mkey.deviceId);
+ }
+
+ private boolean checkIntEqual(Integer a, Integer b)
+ {
+ if ((a == null) && (b == null)) {
+ return true;
+ }
+ if ((a != null) && a.equals(b)) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean checkStringEqual(String a, String b)
+ {
+ if ((a == null) && (b == null)) {
+ return true;
+ }
+ if ((a != null) && a.equals(b)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder(timeKey);
+ if (customer != null) {
+ sb.append("|0:").append(customer);
+ }
+ if (product != null) {
+ sb.append("|1:").append(product);
+ }
+ if (os != null) {
+ sb.append("|2:").append(os);
+ }
+ if (software1 != null) {
+ sb.append("|3:").append(software1);
+ }
+ if (software2 != null) {
+ sb.append("|4:").append(software2);
+ }
+ if (software3 != null) {
+ sb.append("|5:").append(software3);
+ }
+ if (deviceId != null) {
+ sb.append("|6:").append(deviceId);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * This method returns the deviceId of the device
+ * @return The deviceId
+ */
+ public Integer getDeviceId()
+ {
+ return deviceId;
+ }
+
+ /**
+ * This method sets the deviceId of the device
+ *
+ * @param deviceId
+ */
+ public void setDeviceId(Integer deviceId)
+ {
+ this.deviceId = deviceId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/ResourceType.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/ResourceType.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/ResourceType.java
new file mode 100644
index 0000000..2fd457a
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/data/ResourceType.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.data;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+/**
+ * This class captures the resources whose usage is collected for each device
+ * <p>ResourceType class.</p>
+ *
+ * @since 0.3.5
+ */
+public enum ResourceType
+{
+
+ CPU("cpu"), RAM("ram"), HDD("hdd");
+
+ private static Map<String, ResourceType> descToResource = Maps.newHashMap();
+
+ static {
+ for (ResourceType type : ResourceType.values()) {
+ descToResource.put(type.desc, type);
+ }
+ }
+
+ private String desc;
+
+ private ResourceType(String desc)
+ {
+ this.desc = desc;
+ }
+
+ @Override
+ public String toString()
+ {
+ return desc;
+ }
+
+ /**
+ * This method returns ResourceType for the given description
+ * @param desc the description
+ * @return
+ */
+ public static ResourceType getResourceTypeOf(String desc)
+ {
+ return descToResource.get(desc);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/CalculatorOperator.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/CalculatorOperator.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/CalculatorOperator.java
new file mode 100644
index 0000000..34a9514
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/CalculatorOperator.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.operator;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.Max;
+import javax.validation.constraints.Min;
+
+import org.apache.apex.examples.machinedata.data.MachineInfo;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+import org.apache.apex.examples.machinedata.data.ResourceType;
+import org.apache.apex.examples.machinedata.util.DataTable;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * <p>
+ * CalculatorOperator class.
+ * </p>
+ *
+ * @since 0.3.5
+ */
+public class CalculatorOperator extends BaseOperator
+{
+
+ private final DataTable<MachineKey, ResourceType, List<Integer>> data = new DataTable<>();
+
+ @Min(1)
+ @Max(99)
+ private int kthPercentile = 95; // kth percentile
+ private boolean computePercentile;
+ private boolean computeSD;
+ private boolean computeMax;
+
+ private int percentileThreshold = 80;
+ private int sdThreshold = 70;
+ private int maxThreshold = 99;
+
+ public final transient DefaultInputPort<MachineInfo> dataPort = new DefaultInputPort<MachineInfo>()
+ {
+ @Override
+ public void process(MachineInfo tuple)
+ {
+ addDataToCache(tuple);
+ }
+
+ /**
+ * Stream codec used for partitioning.
+ */
+ @Override
+ public StreamCodec<MachineInfo> getStreamCodec()
+ {
+ return new MachineInfoStreamCodec();
+ }
+ };
+
+ public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Double>>> percentileOutputPort = new DefaultOutputPort<>();
+
+ public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Double>>> sdOutputPort = new DefaultOutputPort<>();
+
+ public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Integer>>> maxOutputPort = new DefaultOutputPort<>();
+
+ public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>();
+
+ private void addDataToCache(MachineInfo tuple)
+ {
+ MachineKey machineKey = tuple.getMachineKey();
+ if (!data.containsRow(machineKey)) {
+ data.put(machineKey, ResourceType.CPU, Lists.<Integer>newArrayList());
+ data.put(machineKey, ResourceType.RAM, Lists.<Integer>newArrayList());
+ data.put(machineKey, ResourceType.HDD, Lists.<Integer>newArrayList());
+ }
+ data.get(machineKey, ResourceType.CPU).add(tuple.getCpu());
+ data.get(machineKey, ResourceType.RAM).add(tuple.getRam());
+ data.get(machineKey, ResourceType.HDD).add(tuple.getHdd());
+ }
+
+ @Override
+ public void endWindow()
+ {
+
+ if (computePercentile) {
+ for (MachineKey machineKey : data.rowKeySet()) {
+ Collections.sort(data.get(machineKey, ResourceType.CPU));
+ Collections.sort(data.get(machineKey, ResourceType.RAM));
+ Collections.sort(data.get(machineKey, ResourceType.HDD));
+
+ Map<ResourceType, Double> percentileData = Maps.newHashMap();
+ percentileData.put(ResourceType.CPU, getKthPercentile(data.get(machineKey, ResourceType.CPU)));
+ percentileData.put(ResourceType.RAM, getKthPercentile(data.get(machineKey, ResourceType.RAM)));
+ percentileData.put(ResourceType.HDD, getKthPercentile(data.get(machineKey, ResourceType.HDD)));
+ percentileOutputPort.emit(new KeyValPair<>(machineKey, percentileData));
+
+ for (ResourceType resourceType : percentileData.keySet()) {
+ double percentileValue = percentileData.get(resourceType);
+ if (percentileValue > percentileThreshold) {
+ emitAlert(resourceType, machineKey, percentileValue, "Percentile");
+ }
+ }
+ }
+ }
+ if (computeSD) {
+ for (MachineKey machineKey : data.rowKeySet()) {
+
+ Map<ResourceType, Double> sdData = Maps.newHashMap();
+
+ for (ResourceType resourceType : ResourceType.values()) {
+ sdData.put(resourceType, getSD(data.get(machineKey, resourceType)));
+ }
+ sdOutputPort.emit(new KeyValPair<>(machineKey, sdData));
+
+ for (ResourceType resourceType : sdData.keySet()) {
+ double sdValue = sdData.get(resourceType);
+ if (sdValue > sdThreshold) {
+ emitAlert(resourceType, machineKey, sdValue, "SD");
+ }
+ }
+ }
+ }
+ if (computeMax) {
+ for (MachineKey machineKey : data.rowKeySet()) {
+
+ Map<ResourceType, Integer> maxData = Maps.newHashMap();
+ maxData.put(ResourceType.CPU, Collections.max(data.get(machineKey, ResourceType.CPU)));
+ maxData.put(ResourceType.RAM, Collections.max(data.get(machineKey, ResourceType.RAM)));
+ maxData.put(ResourceType.HDD, Collections.max(data.get(machineKey, ResourceType.HDD)));
+
+ maxOutputPort.emit(new KeyValPair<>(machineKey, maxData));
+
+ for (ResourceType resourceType : maxData.keySet()) {
+ double sdValue = maxData.get(resourceType).doubleValue();
+ if (sdValue > maxThreshold) {
+ emitAlert(resourceType, machineKey, sdValue, "Max");
+ }
+ }
+ }
+ }
+ data.clear();
+ }
+
+ private void emitAlert(ResourceType type, MachineKey machineKey, double alertVal, String prefix)
+ {
+ BigDecimal decimalVal = new BigDecimal(alertVal);
+ decimalVal = decimalVal.setScale(2, BigDecimal.ROUND_HALF_UP);
+ String alertTime = machineKey.getDay() + machineKey.getTimeKey();
+ smtpAlert.emit(prefix + "-" + type.toString().toUpperCase() + " alert at " + alertTime + " " + type + " usage breached current usage: " + decimalVal.doubleValue() + "% threshold: " + percentileThreshold + "%\n\n" + machineKey);
+ }
+
+ private double getKthPercentile(List<Integer> sorted)
+ {
+
+ double val = (kthPercentile * sorted.size()) / 100.0;
+ if (val == (int)val) {
+ // Whole number
+ int idx = (int)val - 1;
+ return (sorted.get(idx) + sorted.get(idx + 1)) / 2.0;
+ } else {
+ int idx = (int)Math.round(val) - 1;
+ return sorted.get(idx);
+ }
+ }
+
+ private double getSD(List<Integer> data)
+ {
+ int sum = 0;
+ for (int i : data) {
+ sum += i;
+ }
+ double avg = sum / (data.size() * 1.0);
+ double sd = 0;
+ for (Integer point : data) {
+ sd += Math.pow(point - avg, 2);
+ }
+ return Math.sqrt(sd);
+ }
+
+ /**
+ * @param kVal the percentile which will be emitted by this operator
+ */
+ public void setKthPercentile(int kVal)
+ {
+ this.kthPercentile = kVal;
+ }
+
+ /**
+ * @param doCompute when true percentile will be computed
+ */
+ public void setComputePercentile(boolean doCompute)
+ {
+ this.computePercentile = doCompute;
+ }
+
+ /**
+ * @param doCompute when true standard deviation will be computed
+ */
+ public void setComputeSD(boolean doCompute)
+ {
+ this.computeSD = doCompute;
+ }
+
+ /**
+ * @param doCompute when true max will be computed
+ */
+ public void setComputeMax(boolean doCompute)
+ {
+ this.computeMax = doCompute;
+ }
+
+ /**
+ * @param threshold for percentile when breached will cause alert
+ */
+ public void setPercentileThreshold(int threshold)
+ {
+ this.percentileThreshold = threshold;
+ }
+
+ /**
+ * @param threshold for standard deviation when breached will cause alert
+ */
+ public void setSDThreshold(int threshold)
+ {
+ this.sdThreshold = threshold;
+ }
+
+ /**
+ * @param threshold for Max when breached will cause alert
+ */
+ public void setMaxThreshold(int threshold)
+ {
+ this.maxThreshold = threshold;
+ }
+
+ public static class MachineInfoStreamCodec extends KryoSerializableStreamCodec<MachineInfo> implements Serializable
+ {
+ public MachineInfoStreamCodec()
+ {
+ super();
+ }
+
+ @Override
+ public int getPartition(MachineInfo o)
+ {
+ return Objects.hashCode(o.getMachineKey().getCustomer(), o.getMachineKey().getOs(), o.getMachineKey().getProduct(), o.getMachineKey().getSoftware1(), o.getMachineKey().getSoftware2(), o.getMachineKey().getSoftware3());
+ }
+
+ private static final long serialVersionUID = 201411031403L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingOperator.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingOperator.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingOperator.java
new file mode 100644
index 0000000..1de676f
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingOperator.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.operator;
+
+import java.math.BigDecimal;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.examples.machinedata.data.AverageData;
+import org.apache.apex.examples.machinedata.data.MachineInfo;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.lib.util.KeyHashValPair;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This class calculates the average for various resources across different devices for a given key
+ * <p>MachineInfoAveragingOperator class.</p>
+ *
+ * @since 0.9.0
+ */
+@SuppressWarnings("unused")
+public class MachineInfoAveragingOperator extends BaseOperator
+{
+
+ public static final String CPU = "cpu";
+ public static final String RAM = "ram";
+ public static final String HDD = "hdd";
+ public static final String DAY = "day";
+
+ private final transient Map<MachineKey, AverageData> dataMap = new HashMap<>();
+
+ public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<String, String>>> outputPort = new DefaultOutputPort<>();
+
+ public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>();
+
+ private int threshold = 95;
+
+ /**
+ * Buffer all the tuples as is till end window gets called
+ */
+ public final transient DefaultInputPort<KeyHashValPair<MachineKey, AverageData>> inputPort = new DefaultInputPort<KeyHashValPair<MachineKey, AverageData>>()
+ {
+
+ @Override
+ public void process(KeyHashValPair<MachineKey, AverageData> tuple)
+ {
+ addTuple(tuple);
+ }
+ };
+
+ /**
+ * This method returns the threshold value
+ *
+ * @return
+ */
+ public int getThreshold()
+ {
+ return threshold;
+ }
+
+ /**
+ * This method sets the threshold value. If the average usage for any Resource is above this for a given key, then the alert is sent
+ *
+ * @param threshold the threshold value
+ */
+ public void setThreshold(int threshold)
+ {
+ this.threshold = threshold;
+ }
+
+ /**
+ * This adds the given tuple to the dataMap
+ *
+ * @param tuple input tuple
+ */
+ private void addTuple(KeyHashValPair<MachineKey, AverageData> tuple)
+ {
+ MachineKey key = tuple.getKey();
+ dataMap.put(key, tuple.getValue());
+ }
+
+ @Override
+ public void endWindow()
+ {
+ for (Map.Entry<MachineKey, AverageData> entry : dataMap.entrySet()) {
+ MachineKey key = entry.getKey();
+ AverageData averageResultMap = entry.getValue();
+ Map<String, String> averageResult = Maps.newHashMap();
+ long count = averageResultMap.getCount();
+ double average = averageResultMap.getCpu() / count;
+ averageResult.put(CPU, average + "");
+ emitAlert(average, CPU, key);
+ average = averageResultMap.getHdd() / count;
+ averageResult.put(HDD, average + "");
+ emitAlert(average, HDD, key);
+ average = averageResultMap.getRam() / count;
+ averageResult.put(RAM, average + "");
+ emitAlert(average, RAM, key);
+ averageResult.put(DAY, key.getDay());
+ outputPort.emit(new KeyValPair<>(key, averageResult));
+ }
+ dataMap.clear();
+ }
+
+ private void emitAlert(double average, String resourceType, MachineKey key)
+ {
+ if (average > threshold) {
+ BigDecimal bd = new BigDecimal(average);
+ bd = bd.setScale(2, BigDecimal.ROUND_HALF_UP);
+ String stime = key.getDay() + key.getTimeKey();
+ String skey = getKeyInfo(key);
+ smtpAlert.emit(resourceType.toUpperCase() + " alert at " + stime + " " + resourceType + " usage breached current usage: " + bd.doubleValue() + "% threshold: " + threshold + "%\n\n" + skey);
+ }
+ }
+
+ /**
+ * This method is used to artificially generate alerts
+ *
+ * @param genAlert
+ */
+ public void setGenAlert(boolean genAlert)
+ {
+ Calendar calendar = Calendar.getInstance();
+ long timestamp = System.currentTimeMillis();
+ calendar.setTimeInMillis(timestamp);
+ DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
+ Date date = calendar.getTime();
+ String timeKey = minuteDateFormat.format(date);
+ DateFormat dayDateFormat = new SimpleDateFormat("dd");
+ String day = dayDateFormat.format(date);
+
+ MachineKey alertKey = new MachineKey(timeKey, day);
+ alertKey.setCustomer(1);
+ alertKey.setProduct(5);
+ alertKey.setOs(10);
+ alertKey.setSoftware1(12);
+ alertKey.setSoftware2(14);
+ alertKey.setSoftware3(6);
+
+ MachineInfo machineInfo = new MachineInfo();
+ machineInfo.setMachineKey(alertKey);
+ machineInfo.setCpu(threshold + 1);
+ machineInfo.setRam(threshold + 1);
+ machineInfo.setHdd(threshold + 1);
+
+ smtpAlert.emit("CPU Alert: CPU Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
+ smtpAlert.emit("RAM Alert: RAM Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
+ smtpAlert.emit("HDD Alert: HDD Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
+ }
+
+ /**
+ * This method returns the String for a given MachineKey instance
+ *
+ * @param key MachineKey instance that needs to be converted to string
+ * @return
+ */
+ private String getKeyInfo(MachineKey key)
+ {
+ StringBuilder sb = new StringBuilder();
+ if (key instanceof MachineKey) {
+ MachineKey mkey = (MachineKey)key;
+ Integer customer = mkey.getCustomer();
+ if (customer != null) {
+ sb.append("customer: " + customer + "\n");
+ }
+ Integer product = mkey.getProduct();
+ if (product != null) {
+ sb.append("product version: " + product + "\n");
+ }
+ Integer os = mkey.getOs();
+ if (os != null) {
+ sb.append("os version: " + os + "\n");
+ }
+ Integer software1 = mkey.getSoftware1();
+ if (software1 != null) {
+ sb.append("software1 version: " + software1 + "\n");
+ }
+ Integer software2 = mkey.getSoftware2();
+ if (software2 != null) {
+ sb.append("software2 version: " + software2 + "\n");
+ }
+ Integer software3 = mkey.getSoftware3();
+ if (software3 != null) {
+ sb.append("software3 version: " + software3 + "\n");
+ }
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
new file mode 100644
index 0000000..14c8d25
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.examples.machinedata.data.AverageData;
+import org.apache.apex.examples.machinedata.data.MachineInfo;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * This class calculates the partial sum and count for tuples generated by upstream Operator
+ * <p> MachineInfoAveragingPrerequisitesOperator class. </p>
+ *
+ * @since 0.3.5
+ */
+public class MachineInfoAveragingPrerequisitesOperator extends BaseOperator
+{
+
+ // Aggregate sum of all values seen for a key.
+ private Map<MachineKey, AverageData> sums = new HashMap<>();
+
+ public final transient DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>> outputPort = new DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>>()
+ {
+ @Override
+ public Unifier<KeyHashValPair<MachineKey, AverageData>> getUnifier()
+ {
+ MachineInfoAveragingUnifier unifier = new MachineInfoAveragingUnifier();
+ return unifier;
+ }
+ };
+
+ public transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>()
+ {
+
+ @Override
+ public void process(MachineInfo tuple)
+ {
+ MachineKey key = tuple.getMachineKey();
+ AverageData averageData = sums.get(key);
+ if (averageData == null) {
+ averageData = new AverageData(tuple.getCpu(), tuple.getHdd(), tuple.getRam(), 1);
+ sums.put(key, averageData);
+ } else {
+ averageData.setCpu(averageData.getCpu() + tuple.getCpu());
+ averageData.setRam(averageData.getRam() + tuple.getRam());
+ averageData.setHdd(averageData.getHdd() + tuple.getHdd());
+ averageData.setCount(averageData.getCount() + 1);
+ }
+ }
+ };
+
+ @Override
+ public void endWindow()
+ {
+
+ for (Map.Entry<MachineKey, AverageData> entry : sums.entrySet()) {
+ if (outputPort.isConnected()) {
+ outputPort.emit(new KeyHashValPair<>(entry.getKey(), entry.getValue()));
+ }
+ }
+ sums.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingUnifier.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingUnifier.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingUnifier.java
new file mode 100644
index 0000000..02aabe7
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/operator/MachineInfoAveragingUnifier.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.examples.machinedata.data.AverageData;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator.Unifier;
+
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * This class calculates the partial sum and count for a given key
+ * <p>MachineInfoAveragingUnifier class.</p>
+ *
+ * @since 0.9.0
+ */
+public class MachineInfoAveragingUnifier implements Unifier<KeyHashValPair<MachineKey, AverageData>>
+{
+
+ private Map<MachineKey, AverageData> sums = new HashMap<>();
+ public final transient DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>> outputPort = new DefaultOutputPort<>();
+
+ @Override
+ public void beginWindow(long arg0)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void endWindow()
+ {
+ for (Map.Entry<MachineKey, AverageData> entry : sums.entrySet()) {
+ outputPort.emit(new KeyHashValPair<>(entry.getKey(), entry.getValue()));
+ }
+ sums.clear();
+
+ }
+
+ @Override
+ public void setup(OperatorContext arg0)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void teardown()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void process(KeyHashValPair<MachineKey, AverageData> arg0)
+ {
+ MachineKey tupleKey = arg0.getKey();
+ AverageData averageData = sums.get(tupleKey);
+ AverageData tupleValue = arg0.getValue();
+ if (averageData == null) {
+ sums.put(tupleKey, tupleValue);
+ } else {
+ averageData.setCpu(averageData.getCpu() + tupleValue.getCpu());
+ averageData.setRam(averageData.getRam() + tupleValue.getRam());
+ averageData.setHdd(averageData.getHdd() + tupleValue.getHdd());
+ averageData.setCount(averageData.getCount() + tupleValue.getCount());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/Combinatorics.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/Combinatorics.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/Combinatorics.java
new file mode 100644
index 0000000..0ff8985
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/Combinatorics.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Generate combinations of elements for the given array of elements.
+ *
+ * Implements nCr = n! / (r! * (n-r)!)
+ *
+ * @since 0.3.5
+ */
+public class Combinatorics<T>
+{
+
+ private T[] values;
+ private int size = -1;
+ private List<T> result;
+ private Map<Integer, List<T>> resultMap = new HashMap<Integer, List<T>>();
+ private int resultMapSize = 0;
+
+ /**
+ * Generates all possible combinations with all the sizes.
+ *
+ * @param values
+ */
+ public Combinatorics(T[] values)
+ {
+ this.values = values;
+ this.size = -1;
+ this.result = new ArrayList<>();
+ }
+
+ /**
+ * Generates all possible combinations with the given size.
+ *
+ * @param values
+ * @param size
+ */
+ public Combinatorics(T[] values, int size)
+ {
+ this.values = values;
+ this.size = size;
+ this.result = new ArrayList<>();
+ }
+
+ public Map<Integer, List<T>> generate()
+ {
+
+ if (size == -1) {
+ size = values.length;
+ for (int i = 1; i <= size; i++) {
+ int[] tmp = new int[i];
+ Arrays.fill(tmp, -1);
+ generateCombinations(0, 0, tmp);
+ }
+ } else {
+ int[] tmp = new int[size];
+ Arrays.fill(tmp, -1);
+ generateCombinations(0, 0, tmp);
+ }
+ return resultMap;
+ }
+
+ public void generateCombinations(int start, int depth, int[] tmp)
+ {
+ if (depth == tmp.length) {
+ for (int j = 0; j < depth; j++) {
+ result.add(values[tmp[j]]);
+ }
+ resultMap.put(++resultMapSize, result);
+ result = new ArrayList<>();
+ return;
+ }
+ for (int i = start; i < values.length; i++) {
+ tmp[depth] = i;
+ generateCombinations(i + 1, depth + 1, tmp);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/DataTable.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/DataTable.java b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/DataTable.java
new file mode 100644
index 0000000..a147268
--- /dev/null
+++ b/examples/machinedata/src/main/java/org/apache/apex/examples/machinedata/util/DataTable.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata.util;
+
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Maps;
+
+/**
+ * <p>DataTable class.</p>
+ *
+ * @since 0.3.5
+ */
+public class DataTable<R,C,E>
+{
+
+ //machineKey, [cpu,ram,hdd] -> value
+ private final Map<R,Map<C,E>> table = Maps.newHashMap();
+
+ public boolean containsRow(R rowKey)
+ {
+ return table.containsKey(rowKey);
+ }
+
+ public void put(R rowKey,C colKey, E entry)
+ {
+ if (!containsRow(rowKey)) {
+ table.put(rowKey, Maps.<C,E>newHashMap());
+ }
+ table.get(rowKey).put(colKey, entry);
+ }
+
+ @Nullable
+ public E get(R rowKey, C colKey)
+ {
+ if (!containsRow(rowKey)) {
+ return null;
+ }
+ return table.get(rowKey).get(colKey);
+ }
+
+ public Set<R> rowKeySet()
+ {
+ return table.keySet();
+ }
+
+ public void clear()
+ {
+ table.clear();
+ }
+
+ public Map<R,Map<C,E>> getTable()
+ {
+ return table;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/main/resources/META-INF/properties.xml b/examples/machinedata/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..4ceb6b9
--- /dev/null
+++ b/examples/machinedata/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,139 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <property>
+ <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
+ <value>1000</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Receiver.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Receiver.attr.PARTITIONER</name>
+ <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.DimensionsGenerator.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.stream.Events.locality
+ </name>
+ <value>CONTAINER_LOCAL</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.DimensionsGenerator.inputport.inputPort.attr.PARTITION_PARALLEL
+ </name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Aggregator.inputport.inputPort.attr.PARTITION_PARALLEL
+ </name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Aggregator.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.AverageCalculator.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Persister.inputport.input.attr.PARTITION_PARALLEL
+ </name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Persister.attr.APPLICATION_WINDOW_COUNT
+ </name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Persister.store.dbIndex
+ </name>
+ <value>2</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Persister.store.host
+ </name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Persister.store.port
+ </name>
+ <value>6379</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.port.*.attr.QUEUE_CAPACITY</name>
+ <value>32000</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Alerter.from
+ </name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Alerter.subject
+ </name>
+ <value>Alert!!!</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Alerter.recipients.TO
+ </name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Alerter.content
+ </name>
+ <value>{}</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Alerter.smtpHost
+ </name>
+ <value>localhost</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Alerter.smtpPort
+ </name>
+ <value>25</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Alerter.useSsl
+ </name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.operator.Aggregator.outputport.outputPort.attr.UNIFIER_LIMIT
+ </name>
+ <value>8</value>
+ </property>
+ <property>
+ <name>dt.application.MachineDataExample.stream.DimensionalData.locality
+ </name>
+ <value>THREAD_LOCAL</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/test/java/org/apache/apex/examples/machinedata/CalculatorOperatorTest.java
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/test/java/org/apache/apex/examples/machinedata/CalculatorOperatorTest.java b/examples/machinedata/src/test/java/org/apache/apex/examples/machinedata/CalculatorOperatorTest.java
new file mode 100644
index 0000000..c69a2aa
--- /dev/null
+++ b/examples/machinedata/src/test/java/org/apache/apex/examples/machinedata/CalculatorOperatorTest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.machinedata;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.examples.machinedata.data.MachineInfo;
+import org.apache.apex.examples.machinedata.data.MachineKey;
+import org.apache.apex.examples.machinedata.data.ResourceType;
+import org.apache.apex.examples.machinedata.operator.CalculatorOperator;
+
+
+import com.google.common.collect.ImmutableList;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.TimeBucketKey;
+
+/**
+ * @since 0.3.5
+ */
+public class CalculatorOperatorTest
+{
+ private static DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
+ private static Logger LOG = LoggerFactory.getLogger(CalculatorOperatorTest.class);
+
+ /**
+ * Test node logic emits correct results
+ */
+ @Test
+ public void testNodeProcessing() throws Exception
+ {
+ CalculatorOperator calculatorOperator = new CalculatorOperator();
+ calculatorOperator.setup(null);
+
+ calculatorOperator.setComputePercentile(true);
+ calculatorOperator.setComputeMax(true);
+ calculatorOperator.setComputeSD(true);
+
+ testPercentile(calculatorOperator);
+ }
+
+ public void testPercentile(CalculatorOperator oper)
+ {
+
+ CollectorTestSink sortSink = new CollectorTestSink();
+ oper.percentileOutputPort.setSink(sortSink);
+ oper.setKthPercentile(50);
+ Calendar calendar = Calendar.getInstance();
+ Date date = calendar.getTime();
+ String timeKey = minuteDateFormat.format(date);
+ String day = calendar.get(Calendar.DAY_OF_MONTH) + "";
+
+ Integer vs = new Integer(1);
+ MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs);
+
+ oper.beginWindow(0);
+
+ MachineInfo info = new MachineInfo(mk, 1, 1, 1);
+ oper.dataPort.process(info);
+
+ info.setCpu(2);
+ oper.dataPort.process(info);
+
+ info.setCpu(3);
+ oper.dataPort.process(info);
+
+ oper.endWindow();
+
+ Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
+ for (Object o : sortSink.collectedTuples) {
+ LOG.debug(o.toString());
+ KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
+ Assert.assertEquals("emitted value for 'cpu' was ", 2.0, keyValPair.getValue().get(ResourceType.CPU), 0);
+ Assert.assertEquals("emitted value for 'hdd' was ", 1.0, keyValPair.getValue().get(ResourceType.HDD), 0);
+ Assert.assertEquals("emitted value for 'ram' was ", 1.0, keyValPair.getValue().get(ResourceType.RAM), 0);
+
+ }
+ LOG.debug("Done percentile testing\n");
+
+ }
+
+ public void testStandarDeviation(CalculatorOperator oper)
+ {
+ CollectorTestSink sortSink = new CollectorTestSink();
+ oper.sdOutputPort.setSink(sortSink);
+ Calendar calendar = Calendar.getInstance();
+ Date date = calendar.getTime();
+ String timeKey = minuteDateFormat.format(date);
+ String day = calendar.get(Calendar.DAY_OF_MONTH) + "";
+
+ Integer vs = new Integer(1);
+ MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs);
+
+ oper.beginWindow(0);
+
+ MachineInfo info = new MachineInfo(mk, 1, 1, 1);
+ oper.dataPort.process(info);
+
+ info.setCpu(2);
+ oper.dataPort.process(info);
+
+ info.setCpu(3);
+ oper.dataPort.process(info);
+
+ oper.endWindow();
+
+ Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
+ for (Object o : sortSink.collectedTuples) {
+ LOG.debug(o.toString());
+ KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
+ Assert.assertEquals("emitted value for 'cpu' was ", getSD(ImmutableList.of(1, 2, 3)), keyValPair.getValue().get(ResourceType.CPU), 0);
+ Assert.assertEquals("emitted value for 'hdd' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.HDD), 0);
+ Assert.assertEquals("emitted value for 'ram' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.RAM), 0);
+
+ }
+ LOG.debug("Done sd testing\n");
+
+ }
+
+ private final double getSD(List<Integer> input)
+ {
+ int sum = 0;
+ for (int i : input) {
+ sum += i;
+ }
+ double avg = sum / (input.size() * 1.0);
+ double sd = 0;
+ for (Integer point : input) {
+ sd += Math.pow(point - avg, 2);
+ }
+ return Math.sqrt(sd);
+ }
+
+ public void testMax(CalculatorOperator oper)
+ {
+ CollectorTestSink sortSink = new CollectorTestSink();
+ oper.maxOutputPort.setSink(sortSink);
+ Calendar calendar = Calendar.getInstance();
+ Date date = calendar.getTime();
+ String timeKey = minuteDateFormat.format(date);
+ String day = calendar.get(Calendar.DAY_OF_MONTH) + "";
+
+ Integer vs = new Integer(1);
+ MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs);
+
+ oper.beginWindow(0);
+
+ MachineInfo info = new MachineInfo(mk, 1, 1, 1);
+ oper.dataPort.process(info);
+
+ info.setCpu(2);
+ oper.dataPort.process(info);
+
+ info.setCpu(3);
+ oper.dataPort.process(info);
+
+ oper.endWindow();
+
+ Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
+ for (Object o : sortSink.collectedTuples) {
+ LOG.debug(o.toString());
+ KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
+ Assert.assertEquals("emitted value for 'cpu' was ", 3, keyValPair.getValue().get(ResourceType.CPU), 0);
+ Assert.assertEquals("emitted value for 'hdd' was ", 1, keyValPair.getValue().get(ResourceType.HDD), 0);
+ Assert.assertEquals("emitted value for 'ram' was ", 1, keyValPair.getValue().get(ResourceType.RAM), 0);
+
+ }
+ LOG.debug("Done max testing\n");
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/machinedata/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/machinedata/src/test/resources/log4j.properties b/examples/machinedata/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/machinedata/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/pom.xml
----------------------------------------------------------------------
diff --git a/examples/mobile/pom.xml b/examples/mobile/pom.xml
new file mode 100644
index 0000000..cb40887
--- /dev/null
+++ b/examples/mobile/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>malhar-examples-mobile</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar Mobile Example</name>
+ <description></description>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <skipTests>true</skipTests>
+ </properties>
+
+ <dependencies>
+ <!-- add your dependencies here -->
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>8.1.10.v20130312</version>
+ <scope>test</scope>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-websocket</artifactId>
+ <version>8.1.10.v20130312</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.1</version>
+ <type>jar</type>
+ </dependency>
+ </dependencies>
+
+</project>