You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:28 UTC
[23/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
deleted file mode 100644
index 55b299f..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingOperator;
-import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingPrerequisitesOperator;
-import com.datatorrent.lib.io.SmtpOutputOperator;
-
-/**
- * <p>
- * Resource monitor application.
- * </p>
- *
- * @since 0.3.5
- */
-@ApplicationAnnotation(name = "MachineDataDemo")
-@SuppressWarnings("unused")
-public class Application implements StreamingApplication
-{
-
- private static final Logger LOG = LoggerFactory.getLogger(Application.class);
-
- /**
- * This function sets up the DAG for calculating the average
- *
- * @param dag the DAG instance
- * @param conf the configuration instance
- * @return MachineInfoAveragingPrerequisitesOperator
- */
- private MachineInfoAveragingPrerequisitesOperator addAverageCalculation(DAG dag, Configuration conf)
- {
- MachineInfoAveragingPrerequisitesOperator prereqAverageOper = dag.addOperator("Aggregator", MachineInfoAveragingPrerequisitesOperator.class);
- MachineInfoAveragingOperator averageOperator = dag.addOperator("AverageCalculator", MachineInfoAveragingOperator.class);
- RedisKeyValPairOutputOperator<MachineKey, Map<String, String>> redisAvgOperator = dag.addOperator("Persister", new RedisKeyValPairOutputOperator<MachineKey, Map<String, String>>());
- dag.addStream("Average", averageOperator.outputPort, redisAvgOperator.input);
- SmtpOutputOperator smtpOutputOperator = dag.addOperator("Alerter", new SmtpOutputOperator());
- dag.addStream("Aggregates", prereqAverageOper.outputPort, averageOperator.inputPort);
- dag.addStream("Alerts", averageOperator.smtpAlert, smtpOutputOperator.input);
- return prereqAverageOper;
- }
-
- /**
- * Create the DAG
- */
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- InputReceiver randomGen = dag.addOperator("Receiver", InputReceiver.class);
- DimensionGenerator dimensionGenerator = dag.addOperator("DimensionsGenerator", DimensionGenerator.class);
- dag.addStream("Events", randomGen.outputInline, dimensionGenerator.inputPort);
- MachineInfoAveragingPrerequisitesOperator prereqAverageOper = addAverageCalculation(dag, conf);
- dag.addStream("DimensionalData", dimensionGenerator.outputInline, prereqAverageOper.inputPort);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
deleted file mode 100644
index 75c2a02..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-
-/**
- * <p>
- * Information tuple generator with randomness.
- * </p>
- *
- * @since 0.3.5
- */
-@SuppressWarnings("unused")
-public class DimensionGenerator extends BaseOperator
-{
- public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>();
- public transient DefaultOutputPort<MachineInfo> output = new DefaultOutputPort<>();
- private int threshold = 90;
-
- public final transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>()
- {
-
- @Override
- public void process(MachineInfo tuple)
- {
- emitDimensions(tuple);
- }
-
- };
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- super.setup(context);
- }
-
- /**
- * This returns the threshold value set
- * @return
- */
- public int getThreshold()
- {
- return threshold;
- }
-
- /**
- * This function sets the threshold value. This value is used to check the maximum value for cpu/ram/hdd
- * @param threshold
- */
- public void setThreshold(int threshold)
- {
- this.threshold = threshold;
- }
-
- /**
- * This function takes in the tuple from upstream operator and generates tuples with different dimension combinations
- *
- * @param tuple
- */
- private void emitDimensions(MachineInfo tuple)
- {
- MachineKey tupleKey = tuple.getMachineKey();
-
- for (int i = 0; i < 64; i++) {
- MachineKey machineKey = new MachineKey(tupleKey.getTimeKey(),tupleKey.getDay());
- if ((i & 1) != 0) {
- machineKey.setCustomer(tupleKey.getCustomer());
- }
- if ((i & 2) != 0) {
- machineKey.setProduct(tupleKey.getProduct());
- }
- if ((i & 4) != 0) {
- machineKey.setOs(tupleKey.getOs());
- }
- if ((i & 8) != 0) {
- machineKey.setDeviceId(tupleKey.getDeviceId());
- }
- if ((i & 16) != 0) {
- machineKey.setSoftware1(tupleKey.getSoftware1());
- }
- if ((i & 32) != 0) {
- machineKey.setSoftware2(tupleKey.getSoftware2());
- }
-
- int cpu = tuple.getCpu();
- int ram = tuple.getRam();
- int hdd = tuple.getHdd();
- MachineInfo machineInfo = new MachineInfo();
- machineInfo.setMachineKey(machineKey);
- machineInfo.setCpu((cpu < threshold) ? cpu : threshold);
- machineInfo.setRam((ram < threshold) ? ram : threshold);
- machineInfo.setHdd((hdd < threshold) ? hdd : threshold);
- outputInline.emit(machineInfo);
- output.emit(machineInfo);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
deleted file mode 100644
index 85ec954..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Random;
-import java.util.TimeZone;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-
-/**
- * <p>
- * Information tuple generator with randomness.
- * </p>
- *
- * @since 0.3.5
- */
-@SuppressWarnings("unused")
-public class InputReceiver extends BaseOperator implements InputOperator
-{
- private static final Logger logger = LoggerFactory.getLogger(InputReceiver.class);
-
- public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>();
- private final Random randomGen = new Random();
-
- private int customerMin = 1;
- private int customerMax = 5;
- private int productMin = 4;
- private int productMax = 6;
- private int osMin = 10;
- private int osMax = 12;
- private int software1Min = 10;
- private int software1Max = 12;
- private int software2Min = 12;
- private int software2Max = 14;
- private int software3Min = 4;
- private int software3Max = 6;
-
- private int deviceIdMin = 1;
- private int deviceIdMax = 50;
-
- private int tupleBlastSize = 1001;
-
- private static final DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
- private static final DateFormat dayDateFormat = new SimpleDateFormat("d");
-
- static {
- TimeZone tz = TimeZone.getTimeZone("GMT");
- minuteDateFormat.setTimeZone(tz);
- dayDateFormat.setTimeZone(tz);
-
- }
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- super.setup(context);
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- super.beginWindow(windowId);
- }
-
- @Override
- public void emitTuples()
- {
- int count = 0;
- Calendar calendar = Calendar.getInstance();
- Date date = calendar.getTime();
- String timeKey = minuteDateFormat.format(date);
- String day = dayDateFormat.format(date);
-
- while (count < tupleBlastSize) {
- randomGen.setSeed(System.currentTimeMillis());
-
- int customerVal = genCustomerId();
- int productVal = genProductVer();
- int osVal = genOsVer();
- int software1Val = genSoftware1Ver();
- int software2Val = genSoftware2Ver();
- int software3Val = genSoftware3Ver();
- int deviceIdVal = genDeviceId();
-
- int cpuVal = genCpu(calendar);
- int ramVal = genRam(calendar);
- int hddVal = genHdd(calendar);
-
- MachineKey machineKey = new MachineKey(timeKey, day);
-
- machineKey.setCustomer(customerVal);
- machineKey.setProduct(productVal);
- machineKey.setOs(osVal);
- machineKey.setDeviceId(deviceIdVal);
- machineKey.setSoftware1(software1Val);
- machineKey.setSoftware2(software2Val);
- machineKey.setSoftware3(software3Val);
- MachineInfo machineInfo = new MachineInfo();
- machineInfo.setMachineKey(machineKey);
- machineInfo.setCpu(cpuVal);
- machineInfo.setRam(ramVal);
- machineInfo.setHdd(hddVal);
-
- outputInline.emit(machineInfo);
-
- count++;
- }
- }
-
- private int genCustomerId()
- {
- int range = customerMax - customerMin + 1;
- return customerMin + randomGen.nextInt(range);
- }
-
- private int genProductVer()
- {
- int range = productMax - productMin + 1;
- return productMin + randomGen.nextInt(range);
- }
-
- private int genOsVer()
- {
- int range = osMax - osMin + 1;
- return osMin + randomGen.nextInt(range);
- }
-
- private int genSoftware3Ver()
- {
- int range = software3Max - software3Min + 1;
- return software3Min + randomGen.nextInt(range);
- }
-
- private int genDeviceId()
- {
- int range = deviceIdMax - deviceIdMin + 1;
- return deviceIdMin + randomGen.nextInt(range);
- }
-
- private int genSoftware1Ver()
- {
- int range = software1Max - software1Min + 1;
- return software1Min + randomGen.nextInt(range);
- }
-
- private int genSoftware2Ver()
- {
- int range = software2Max - software2Min + 1;
- return software2Min + randomGen.nextInt(range);
- }
-
- private int genCpu(Calendar cal)
- {
- int minute = cal.get(Calendar.MINUTE);
- int second;
- int range = minute / 2 + 19;
- if (minute / 17 == 0) {
- second = cal.get(Calendar.SECOND);
- return (30 + randomGen.nextInt(range) + (minute % 7) - (second % 11));
- } else if (minute / 47 == 0) {
- second = cal.get(Calendar.SECOND);
- return (7 + randomGen.nextInt(range) + (minute % 7) - (second % 7));
- } else {
- second = cal.get(Calendar.SECOND);
- return (randomGen.nextInt(range) + (minute % 19) + (second % 7));
- }
- }
-
- private int genRam(Calendar cal)
- {
- int minute = cal.get(Calendar.MINUTE);
- int second;
- int range = minute + 1;
- if (minute / 23 == 0) {
- second = cal.get(Calendar.SECOND);
- return (20 + randomGen.nextInt(range) + (minute % 5) - (second % 11));
- } else if (minute / 37 == 0) {
- second = cal.get(Calendar.SECOND);
- return (11 + randomGen.nextInt(60) - (minute % 5) - (second % 11));
- } else {
- second = cal.get(Calendar.SECOND);
- return (randomGen.nextInt(range) + (minute % 17) + (second % 11));
- }
- }
-
- private int genHdd(Calendar cal)
- {
- int minute = cal.get(Calendar.MINUTE);
- int second;
- int range = minute / 2 + 1;
- if (minute / 37 == 0) {
- second = cal.get(Calendar.SECOND);
- return (25 + randomGen.nextInt(range) - minute % 7 - second % 11);
- } else {
- second = cal.get(Calendar.SECOND);
- return (randomGen.nextInt(range) + minute % 23 + second % 11);
- }
- }
-
- /**
- * This method returns the minimum value for customer
- *
- * @return
- */
- public int getCustomerMin()
- {
- return customerMin;
- }
-
- /**
- * This method is used to set the minimum value for customer
- *
- * @param customerMin the minimum customer value
- */
- public void setCustomerMin(int customerMin)
- {
- this.customerMin = customerMin;
- }
-
- /**
- * This method returns the max value for customer
- *
- * @return
- */
- public int getCustomerMax()
- {
- return customerMax;
- }
-
- /**
- * This method is used to set the max value for customer
- *
- * @param customerMax the max customer value
- */
- public void setCustomerMax(int customerMax)
- {
- this.customerMax = customerMax;
- }
-
- /**
- * This method returns the minimum value for product
- *
- * @return
- */
- public int getProductMin()
- {
- return productMin;
- }
-
- /**
- * This method is used to set the minimum value for product
- *
- * @param productMin the minimum product value
- */
- public void setProductMin(int productMin)
- {
- this.productMin = productMin;
- }
-
- /**
- * This method returns the max value for product
- *
- * @return
- */
- public int getProductMax()
- {
- return productMax;
- }
-
- /**
- * This method is used to set the max value for product
- *
- * @param productMax the max product value
- */
- public void setProductMax(int productMax)
- {
- this.productMax = productMax;
- }
-
- /**
- * This method returns the minimum value for OS
- *
- * @return
- */
- public int getOsMin()
- {
- return osMin;
- }
-
- /**
- * This method is used to set the minimum value for OS
- *
- * @param osMin the min OS value
- */
- public void setOsMin(int osMin)
- {
- this.osMin = osMin;
- }
-
- /**
- * This method returns the max value for OS
- *
- * @return
- */
- public int getOsMax()
- {
- return osMax;
- }
-
- /**
- * This method is used to set the max value for OS
- *
- * @param osMax the max OS value
- */
- public void setOsMax(int osMax)
- {
- this.osMax = osMax;
- }
-
- /**
- * This method returns the minimum value for software1
- *
- * @return
- */
- public int getSoftware1Min()
- {
- return software1Min;
- }
-
- /**
- * This method is used to set the minimum value for software1
- *
- * @param software1Min the minimum software1 value
- */
- public void setSoftware1Min(int software1Min)
- {
- this.software1Min = software1Min;
- }
-
- /**
- * This method returns the max value for software1
- *
- * @return
- */
- public int getSoftware1Max()
- {
- return software1Max;
- }
-
- /**
- * This method is used to set the max value for software1
- *
- * @param software1Max the max software1 value
- */
- public void setSoftware1Max(int software1Max)
- {
- this.software1Max = software1Max;
- }
-
- /**
- * This method returns the minimum value for software2
- *
- * @return
- */
- public int getSoftware2Min()
- {
- return software2Min;
- }
-
- /**
- * This method is used to set the minimum value for software2
- *
- * @param software2Min the minimum software2 value
- */
- public void setSoftware2Min(int software2Min)
- {
- this.software2Min = software2Min;
- }
-
- /**
- * This method returns the max value for software2
- *
- * @return
- */
- public int getSoftware2Max()
- {
- return software2Max;
- }
-
- /**
- * This method is used to set the max value for software2
- *
- * @param software2Max the max software2 value
- */
- public void setSoftware2Max(int software2Max)
- {
- this.software2Max = software2Max;
- }
-
- /**
- * This method returns the minimum value for software3
- *
- * @return
- */
- public int getSoftware3Min()
- {
- return software3Min;
- }
-
- /**
- * This method is used to set the minimum value for software3
- *
- * @param software3Min the minimum software3 value
- */
- public void setSoftware3Min(int software3Min)
- {
- this.software3Min = software3Min;
- }
-
- /**
- * This method returns the max value for software3
- *
- * @return
- */
- public int getSoftware3Max()
- {
- return software3Max;
- }
-
- /**
- * This method is used to set the max value for software3
- *
- * @param software3Max the max software3 value
- */
- public void setSoftware3Max(int software3Max)
- {
- this.software3Max = software3Max;
- }
-
- /**
- * This method returns the minimum value for deviceId
- *
- * @return
- */
- public int getDeviceIdMin()
- {
- return deviceIdMin;
- }
-
- /**
- * This method is used to set the minimum value for deviceId
- *
- * @param deviceIdMin the minimum deviceId value
- */
- public void setDeviceIdMin(int deviceIdMin)
- {
- this.deviceIdMin = deviceIdMin;
- }
-
- /**
- * This method returns the max value for deviceId
- *
- * @return
- */
- public int getDeviceIdMax()
- {
- return deviceIdMax;
- }
-
- /**
- * This method is used to set the max value for deviceId
- *
- * @param deviceIdMax the max deviceId value
- */
- public void setDeviceIdMax(int deviceIdMax)
- {
- this.deviceIdMax = deviceIdMax;
- }
-
- /**
- * @return the tupleBlastSize
- */
- public int getTupleBlastSize()
- {
- return tupleBlastSize;
- }
-
- /**
- * @param tupleBlastSize the tupleBlastSize to set
- */
- public void setTupleBlastSize(int tupleBlastSize)
- {
- this.tupleBlastSize = tupleBlastSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java
deleted file mode 100644
index 3c74cc5..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.data;
-
-
-/**
- * This class stores the value of sum and the count of values summed.
- * <p>
- * AverageData class.
- * </p>
- *
- * @since 0.3.5
- */
-public class AverageData
-{
-
- private long cpu;
- private long hdd;
- private long ram;
- private long count;
-
- /**
- * This is default constructor that sets the sum and count to 0
- */
- public AverageData()
- {
-
- }
-
- /**
- * This constructor takes the value of sum and count and initialize the local attributes to corresponding values
- *
- * @param count
- * the value of count
- */
- public AverageData(long cpu,long hdd,long ram, long count)
- {
- this.cpu = cpu;
- this.ram = ram;
- this.hdd = hdd;
- this.count = count;
- }
-
- public long getCpu()
- {
- return cpu;
- }
-
- public void setCpu(long cpu)
- {
- this.cpu = cpu;
- }
-
- public long getHdd()
- {
- return hdd;
- }
-
- public void setHdd(long hdd)
- {
- this.hdd = hdd;
- }
-
- public long getRam()
- {
- return ram;
- }
-
- public void setRam(long ram)
- {
- this.ram = ram;
- }
-
- /**
- * This returns the value of count
- * @return
- */
- public long getCount()
- {
- return count;
- }
-
- /**
- * This method sets the value of count
- * @param count
- */
- public void setCount(long count)
- {
- this.count = count;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java
deleted file mode 100644
index 6f02a24..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.data;
-
-/**
- * This class stores the cpu% usage, ram% usage, hdd% usage and key information about a particular machine
- * <p>
- * MachineInfo class.
- * </p>
- *
- * @since 0.3.5
- */
-public class MachineInfo
-{
- private MachineKey machineKey;
- private int cpu;
- private int ram;
- private int hdd;
-
- /**
- * This default constructor
- */
- public MachineInfo()
- {
- }
-
- /**
- * This constructor takes MachineKey as input and initialize local attributes
- *
- * @param machineKey
- * the MachineKey instance
- */
- public MachineInfo(MachineKey machineKey)
- {
- this.machineKey = machineKey;
- }
-
- /**
- * This constructor takes MachineKey, cpu usage, ram usage, hdd usage as input and initialize local attributes
- *
- * @param machineKey
- * the MachineKey instance
- * @param cpu
- * the CPU% usage
- * @param ram
- * the RAM% usage
- * @param hdd
- * the HDD% usage
- */
- public MachineInfo(MachineKey machineKey, int cpu, int ram, int hdd)
- {
- this.machineKey = machineKey;
- this.cpu = cpu;
- this.ram = ram;
- this.hdd = hdd;
- }
-
- /**
- * This method returns the MachineKey
- *
- * @return
- */
- public MachineKey getMachineKey()
- {
- return machineKey;
- }
-
- /**
- * This method sets the MachineKey
- *
- * @param machineKey
- * the MachineKey instance
- */
- public void setMachineKey(MachineKey machineKey)
- {
- this.machineKey = machineKey;
- }
-
- /**
- * This method returns the CPU% usage
- *
- * @return
- */
- public int getCpu()
- {
- return cpu;
- }
-
- /**
- * This method sets the CPU% usage
- *
- * @param cpu
- * the CPU% usage
- */
- public void setCpu(int cpu)
- {
- this.cpu = cpu;
- }
-
- /**
- * This method returns the RAM% usage
- *
- * @return
- */
- public int getRam()
- {
- return ram;
- }
-
- /**
- * This method sets the RAM% usage
- *
- * @param ram
- * the RAM% usage
- */
- public void setRam(int ram)
- {
- this.ram = ram;
- }
-
- /**
- * This method returns the HDD% usage
- *
- * @return
- */
- public int getHdd()
- {
- return hdd;
- }
-
- /**
- * This method sets the HDD% usage
- *
- * @param hdd
- * the HDD% usage
- */
- public void setHdd(int hdd)
- {
- this.hdd = hdd;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
deleted file mode 100644
index 2b3bb1c..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.data;
-
-/**
- * This class stores the information about various softwares, deviceIds, OS of the device
- * <p>
- * MachineKey class.
- * </p>
- *
- * @since 0.3.5
- */
-public class MachineKey
-{
-
- private Integer customer;
- private Integer product;
- private Integer os;
- private Integer software1;
- private Integer software2;
- private Integer software3;
- private Integer deviceId;
- private String timeKey;
- private String day;
-
- /**
- * This constructor takes the format in which time has to be captured and the day when this instance is created
- *
- * @param timeKey the format in which time has to be captured
- * @param day the day when this instance is created
- */
- public MachineKey(String timeKey, String day)
- {
- this.timeKey = timeKey;
- this.day = day;
- }
-
- /**
- * This is default constructor
- */
- public MachineKey()
- {
- }
-
- /**
- * This constructor takes format in which time has to be captured, the day when this instance is created, the customer
- * id, product Id on the device, OS version on the device, software1 version on the device, software2 version on the device,
- * software3 version on the device, deviceId on the device,
- *
- * @param timeKey the format in which time has to be captured
- * @param day the day when this instance is created
- * @param customer the customer Id
- * @param product product Id
- * @param os OS version
- * @param software1 software1 version
- * @param software2 software2 version
- * @param software3 software3 version
- * @param deviceId deviceId
- */
- public MachineKey(String timeKey, String day, Integer customer, Integer product, Integer os, Integer software1, Integer software2, Integer software3, Integer deviceId)
- {
- this.timeKey = timeKey;
- this.day = day;
- this.customer = customer;
- this.product = product;
- this.os = os;
- this.software1 = software1;
- this.software2 = software2;
- this.software3 = software3;
- this.deviceId = deviceId;
- }
-
- /**
- * This method returns the format in which the time is captured. The time is the time when this instance of MachineKey
- * was generated. For e.g. HHmm to capture Hour and minute
- *
- * @return
- */
- public String getTimeKey()
- {
- return timeKey;
- }
-
- /**
- * This method sets the format in which the time is captured. The time is the time when this instance of MachineKey
- * was generated. For e.g. HHmm to capture Hour and minute
- *
- * @param timeKey
- * the value of format
- */
- public void setTimeKey(String timeKey)
- {
- this.timeKey = timeKey;
- }
-
- /**
- * This method returns the day of the month when this instance of MachineKey was generated
- *
- * @return
- */
- public String getDay()
- {
- return day;
- }
-
- /**
- * This method sets the day of the month when this instance of MachineKey was generated
- *
- * @param day
- * the day of the month
- */
- public void setDay(String day)
- {
- this.day = day;
- }
-
- /**
- * This method returns the customer Id
- *
- * @return
- */
- public Integer getCustomer()
- {
- return customer;
- }
-
- /**
- * This method sets the customer Id
- *
- * @param customer
- * the customer Id
- */
- public void setCustomer(Integer customer)
- {
- this.customer = customer;
- }
-
- /**
- * This method returns product on the device
- *
- * @return
- */
- public Integer getProduct()
- {
- return product;
- }
-
- /**
- * This method sets the product on the device
- *
- * @param product
- * the value of product
- */
- public void setProduct(Integer product)
- {
- this.product = product;
- }
-
- /**
- * This method returns the OS version on the device
- *
- * @return
- */
- public Integer getOs()
- {
- return os;
- }
-
- /**
- * This method sets the OS version on the device
- *
- * @param os
- * OS version
- */
- public void setOs(Integer os)
- {
- this.os = os;
- }
-
- /**
- * This method returns the version of the software1 on the device
- *
- * @return
- */
- public Integer getSoftware1()
- {
- return software1;
- }
-
- /**
- * This method sets the version of the software1 on the device
- *
- * @param software1 the version of the software1
- */
- public void setSoftware1(Integer software1)
- {
- this.software1 = software1;
- }
-
- /**
- * This method returns the version of the software2 on the device
- *
- * @return
- */
- public Integer getSoftware2()
- {
- return software2;
- }
-
- /**
- * This method sets the version of the software2 on the device
- *
- * @param software2
- * the version of the software2
- */
- public void setSoftware2(Integer software2)
- {
- this.software2 = software2;
- }
-
- /**
- * This method returns the version of the software3 on the device
- *
- * @return
- */
- public Integer getSoftware3()
- {
- return software3;
- }
-
- /**
- * This method sets the version of the software3 on the device
- *
- * @param software3
- * the version of the software3
- */
- public void setSoftware3(Integer software3)
- {
- this.software3 = software3;
- }
-
- @Override
- public int hashCode()
- {
- int key = 0;
- if (customer != null) {
- key |= (1 << 31);
- key ^= customer;
- }
- if (product != null) {
- key |= (1 << 30);
- key ^= product;
- }
- if (os != null) {
- key |= (1 << 29);
- key ^= os;
- }
- if (software1 != null) {
- key |= (1 << 28);
- key ^= software1;
- }
- if (software2 != null) {
- key |= (1 << 27);
- key ^= software2;
- }
- if (software3 != null) {
- key |= (1 << 26);
- key ^= software3;
- }
- if (deviceId != null) {
- key |= (1 << 25);
- key ^= deviceId;
- }
- if (timeKey != null) {
- key |= (1 << 24);
- key ^= timeKey.hashCode();
- }
- if (day != null) {
- key |= (1 << 23);
- key ^= day.hashCode();
- }
-
- return key;
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (!(obj instanceof MachineKey)) {
- return false;
- }
- MachineKey mkey = (MachineKey)obj;
- return checkStringEqual(this.timeKey, mkey.timeKey) && checkStringEqual(this.day, mkey.day) && checkIntEqual(this.customer, mkey.customer) && checkIntEqual(this.product, mkey.product) && checkIntEqual(this.os, mkey.os) && checkIntEqual(this.software1, mkey.software1) && checkIntEqual(this.software2, mkey.software2) && checkIntEqual(this.software3, mkey.software3) && checkIntEqual(this.deviceId, mkey.deviceId);
- }
-
- private boolean checkIntEqual(Integer a, Integer b)
- {
- if ((a == null) && (b == null)) {
- return true;
- }
- if ((a != null) && a.equals(b)) {
- return true;
- }
- return false;
- }
-
- private boolean checkStringEqual(String a, String b)
- {
- if ((a == null) && (b == null)) {
- return true;
- }
- if ((a != null) && a.equals(b)) {
- return true;
- }
- return false;
- }
-
- @Override
- public String toString()
- {
- StringBuilder sb = new StringBuilder(timeKey);
- if (customer != null) {
- sb.append("|0:").append(customer);
- }
- if (product != null) {
- sb.append("|1:").append(product);
- }
- if (os != null) {
- sb.append("|2:").append(os);
- }
- if (software1 != null) {
- sb.append("|3:").append(software1);
- }
- if (software2 != null) {
- sb.append("|4:").append(software2);
- }
- if (software3 != null) {
- sb.append("|5:").append(software3);
- }
- if (deviceId != null) {
- sb.append("|6:").append(deviceId);
- }
- return sb.toString();
- }
-
- /**
- * This method returns the deviceId of the device
- * @return The deviceId
- */
- public Integer getDeviceId()
- {
- return deviceId;
- }
-
- /**
- * This method sets the deviceId of the device
- *
- * @param deviceId
- */
- public void setDeviceId(Integer deviceId)
- {
- this.deviceId = deviceId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
deleted file mode 100644
index d474c5c..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.data;
-
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-
-/**
- * This class captures the resources whose usage is collected for each device
- * <p>ResourceType class.</p>
- *
- * @since 0.3.5
- */
-public enum ResourceType
-{
-
- CPU("cpu"), RAM("ram"), HDD("hdd");
-
- private static Map<String, ResourceType> descToResource = Maps.newHashMap();
-
- static {
- for (ResourceType type : ResourceType.values()) {
- descToResource.put(type.desc, type);
- }
- }
-
- private String desc;
-
- private ResourceType(String desc)
- {
- this.desc = desc;
- }
-
- @Override
- public String toString()
- {
- return desc;
- }
-
- /**
- * This method returns ResourceType for the given description
- * @param desc the description
- * @return
- */
- public static ResourceType getResourceTypeOf(String desc)
- {
- return descToResource.get(desc);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
deleted file mode 100644
index 8f68dab..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.operator;
-
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import javax.validation.constraints.Max;
-import javax.validation.constraints.Min;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.demos.machinedata.data.ResourceType;
-import com.datatorrent.demos.machinedata.util.DataTable;
-import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * <p>
- * CalculatorOperator class.
- * </p>
- *
- * @since 0.3.5
- */
-public class CalculatorOperator extends BaseOperator
-{
-
- private final DataTable<MachineKey, ResourceType, List<Integer>> data = new DataTable<>();
-
- @Min(1)
- @Max(99)
- private int kthPercentile = 95; // kth percentile
- private boolean computePercentile;
- private boolean computeSD;
- private boolean computeMax;
-
- private int percentileThreshold = 80;
- private int sdThreshold = 70;
- private int maxThreshold = 99;
-
- public final transient DefaultInputPort<MachineInfo> dataPort = new DefaultInputPort<MachineInfo>()
- {
- @Override
- public void process(MachineInfo tuple)
- {
- addDataToCache(tuple);
- }
-
- /**
- * Stream codec used for partitioning.
- */
- @Override
- public StreamCodec<MachineInfo> getStreamCodec()
- {
- return new MachineInfoStreamCodec();
- }
- };
-
- public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Double>>> percentileOutputPort = new DefaultOutputPort<>();
-
- public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Double>>> sdOutputPort = new DefaultOutputPort<>();
-
- public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<ResourceType, Integer>>> maxOutputPort = new DefaultOutputPort<>();
-
- public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>();
-
- private void addDataToCache(MachineInfo tuple)
- {
- MachineKey machineKey = tuple.getMachineKey();
- if (!data.containsRow(machineKey)) {
- data.put(machineKey, ResourceType.CPU, Lists.<Integer>newArrayList());
- data.put(machineKey, ResourceType.RAM, Lists.<Integer>newArrayList());
- data.put(machineKey, ResourceType.HDD, Lists.<Integer>newArrayList());
- }
- data.get(machineKey, ResourceType.CPU).add(tuple.getCpu());
- data.get(machineKey, ResourceType.RAM).add(tuple.getRam());
- data.get(machineKey, ResourceType.HDD).add(tuple.getHdd());
- }
-
- @Override
- public void endWindow()
- {
-
- if (computePercentile) {
- for (MachineKey machineKey : data.rowKeySet()) {
- Collections.sort(data.get(machineKey, ResourceType.CPU));
- Collections.sort(data.get(machineKey, ResourceType.RAM));
- Collections.sort(data.get(machineKey, ResourceType.HDD));
-
- Map<ResourceType, Double> percentileData = Maps.newHashMap();
- percentileData.put(ResourceType.CPU, getKthPercentile(data.get(machineKey, ResourceType.CPU)));
- percentileData.put(ResourceType.RAM, getKthPercentile(data.get(machineKey, ResourceType.RAM)));
- percentileData.put(ResourceType.HDD, getKthPercentile(data.get(machineKey, ResourceType.HDD)));
- percentileOutputPort.emit(new KeyValPair<>(machineKey, percentileData));
-
- for (ResourceType resourceType : percentileData.keySet()) {
- double percentileValue = percentileData.get(resourceType);
- if (percentileValue > percentileThreshold) {
- emitAlert(resourceType, machineKey, percentileValue, "Percentile");
- }
- }
- }
- }
- if (computeSD) {
- for (MachineKey machineKey : data.rowKeySet()) {
-
- Map<ResourceType, Double> sdData = Maps.newHashMap();
-
- for (ResourceType resourceType : ResourceType.values()) {
- sdData.put(resourceType, getSD(data.get(machineKey, resourceType)));
- }
- sdOutputPort.emit(new KeyValPair<>(machineKey, sdData));
-
- for (ResourceType resourceType : sdData.keySet()) {
- double sdValue = sdData.get(resourceType);
- if (sdValue > sdThreshold) {
- emitAlert(resourceType, machineKey, sdValue, "SD");
- }
- }
- }
- }
- if (computeMax) {
- for (MachineKey machineKey : data.rowKeySet()) {
-
- Map<ResourceType, Integer> maxData = Maps.newHashMap();
- maxData.put(ResourceType.CPU, Collections.max(data.get(machineKey, ResourceType.CPU)));
- maxData.put(ResourceType.RAM, Collections.max(data.get(machineKey, ResourceType.RAM)));
- maxData.put(ResourceType.HDD, Collections.max(data.get(machineKey, ResourceType.HDD)));
-
- maxOutputPort.emit(new KeyValPair<>(machineKey, maxData));
-
- for (ResourceType resourceType : maxData.keySet()) {
- double sdValue = maxData.get(resourceType).doubleValue();
- if (sdValue > maxThreshold) {
- emitAlert(resourceType, machineKey, sdValue, "Max");
- }
- }
- }
- }
- data.clear();
- }
-
- private void emitAlert(ResourceType type, MachineKey machineKey, double alertVal, String prefix)
- {
- BigDecimal decimalVal = new BigDecimal(alertVal);
- decimalVal = decimalVal.setScale(2, BigDecimal.ROUND_HALF_UP);
- String alertTime = machineKey.getDay() + machineKey.getTimeKey();
- smtpAlert.emit(prefix + "-" + type.toString().toUpperCase() + " alert at " + alertTime + " " + type + " usage breached current usage: " + decimalVal.doubleValue() + "% threshold: " + percentileThreshold + "%\n\n" + machineKey);
- }
-
- private double getKthPercentile(List<Integer> sorted)
- {
-
- double val = (kthPercentile * sorted.size()) / 100.0;
- if (val == (int)val) {
- // Whole number
- int idx = (int)val - 1;
- return (sorted.get(idx) + sorted.get(idx + 1)) / 2.0;
- } else {
- int idx = (int)Math.round(val) - 1;
- return sorted.get(idx);
- }
- }
-
- private double getSD(List<Integer> data)
- {
- int sum = 0;
- for (int i : data) {
- sum += i;
- }
- double avg = sum / (data.size() * 1.0);
- double sd = 0;
- for (Integer point : data) {
- sd += Math.pow(point - avg, 2);
- }
- return Math.sqrt(sd);
- }
-
- /**
- * @param kVal the percentile which will be emitted by this operator
- */
- public void setKthPercentile(int kVal)
- {
- this.kthPercentile = kVal;
- }
-
- /**
- * @param doCompute when true percentile will be computed
- */
- public void setComputePercentile(boolean doCompute)
- {
- this.computePercentile = doCompute;
- }
-
- /**
- * @param doCompute when true standard deviation will be computed
- */
- public void setComputeSD(boolean doCompute)
- {
- this.computeSD = doCompute;
- }
-
- /**
- * @param doCompute when true max will be computed
- */
- public void setComputeMax(boolean doCompute)
- {
- this.computeMax = doCompute;
- }
-
- /**
- * @param threshold for percentile when breached will cause alert
- */
- public void setPercentileThreshold(int threshold)
- {
- this.percentileThreshold = threshold;
- }
-
- /**
- * @param threshold for standard deviation when breached will cause alert
- */
- public void setSDThreshold(int threshold)
- {
- this.sdThreshold = threshold;
- }
-
- /**
- * @param threshold for Max when breached will cause alert
- */
- public void setMaxThreshold(int threshold)
- {
- this.maxThreshold = threshold;
- }
-
- public static class MachineInfoStreamCodec extends KryoSerializableStreamCodec<MachineInfo> implements Serializable
- {
- public MachineInfoStreamCodec()
- {
- super();
- }
-
- @Override
- public int getPartition(MachineInfo o)
- {
- return Objects.hashCode(o.getMachineKey().getCustomer(), o.getMachineKey().getOs(), o.getMachineKey().getProduct(), o.getMachineKey().getSoftware1(), o.getMachineKey().getSoftware2(), o.getMachineKey().getSoftware3());
- }
-
- private static final long serialVersionUID = 201411031403L;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
deleted file mode 100644
index bbfd547..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.operator;
-
-import java.math.BigDecimal;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-
-import com.datatorrent.demos.machinedata.data.AverageData;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.lib.util.KeyHashValPair;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * This class calculates the average for various resources across different devices for a given key
- * <p>MachineInfoAveragingOperator class.</p>
- *
- * @since 0.9.0
- */
-@SuppressWarnings("unused")
-public class MachineInfoAveragingOperator extends BaseOperator
-{
-
- public static final String CPU = "cpu";
- public static final String RAM = "ram";
- public static final String HDD = "hdd";
- public static final String DAY = "day";
-
- private final transient Map<MachineKey, AverageData> dataMap = new HashMap<>();
-
- public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<String, String>>> outputPort = new DefaultOutputPort<>();
-
- public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>();
-
- private int threshold = 95;
-
- /**
- * Buffer all the tuples as is till end window gets called
- */
- public final transient DefaultInputPort<KeyHashValPair<MachineKey, AverageData>> inputPort = new DefaultInputPort<KeyHashValPair<MachineKey, AverageData>>()
- {
-
- @Override
- public void process(KeyHashValPair<MachineKey, AverageData> tuple)
- {
- addTuple(tuple);
- }
- };
-
- /**
- * This method returns the threshold value
- *
- * @return
- */
- public int getThreshold()
- {
- return threshold;
- }
-
- /**
- * This method sets the threshold value. If the average usage for any Resource is above this for a given key, then the alert is sent
- *
- * @param threshold the threshold value
- */
- public void setThreshold(int threshold)
- {
- this.threshold = threshold;
- }
-
- /**
- * This adds the given tuple to the dataMap
- *
- * @param tuple input tuple
- */
- private void addTuple(KeyHashValPair<MachineKey, AverageData> tuple)
- {
- MachineKey key = tuple.getKey();
- dataMap.put(key, tuple.getValue());
- }
-
- @Override
- public void endWindow()
- {
- for (Map.Entry<MachineKey, AverageData> entry : dataMap.entrySet()) {
- MachineKey key = entry.getKey();
- AverageData averageResultMap = entry.getValue();
- Map<String, String> averageResult = Maps.newHashMap();
- long count = averageResultMap.getCount();
- double average = averageResultMap.getCpu() / count;
- averageResult.put(CPU, average + "");
- emitAlert(average, CPU, key);
- average = averageResultMap.getHdd() / count;
- averageResult.put(HDD, average + "");
- emitAlert(average, HDD, key);
- average = averageResultMap.getRam() / count;
- averageResult.put(RAM, average + "");
- emitAlert(average, RAM, key);
- averageResult.put(DAY, key.getDay());
- outputPort.emit(new KeyValPair<>(key, averageResult));
- }
- dataMap.clear();
- }
-
- private void emitAlert(double average, String resourceType, MachineKey key)
- {
- if (average > threshold) {
- BigDecimal bd = new BigDecimal(average);
- bd = bd.setScale(2, BigDecimal.ROUND_HALF_UP);
- String stime = key.getDay() + key.getTimeKey();
- String skey = getKeyInfo(key);
- smtpAlert.emit(resourceType.toUpperCase() + " alert at " + stime + " " + resourceType + " usage breached current usage: " + bd.doubleValue() + "% threshold: " + threshold + "%\n\n" + skey);
- }
- }
-
- /**
- * This method is used to artificially generate alerts
- *
- * @param genAlert
- */
- public void setGenAlert(boolean genAlert)
- {
- Calendar calendar = Calendar.getInstance();
- long timestamp = System.currentTimeMillis();
- calendar.setTimeInMillis(timestamp);
- DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
- Date date = calendar.getTime();
- String timeKey = minuteDateFormat.format(date);
- DateFormat dayDateFormat = new SimpleDateFormat("dd");
- String day = dayDateFormat.format(date);
-
- MachineKey alertKey = new MachineKey(timeKey, day);
- alertKey.setCustomer(1);
- alertKey.setProduct(5);
- alertKey.setOs(10);
- alertKey.setSoftware1(12);
- alertKey.setSoftware2(14);
- alertKey.setSoftware3(6);
-
- MachineInfo machineInfo = new MachineInfo();
- machineInfo.setMachineKey(alertKey);
- machineInfo.setCpu(threshold + 1);
- machineInfo.setRam(threshold + 1);
- machineInfo.setHdd(threshold + 1);
-
- smtpAlert.emit("CPU Alert: CPU Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
- smtpAlert.emit("RAM Alert: RAM Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
- smtpAlert.emit("HDD Alert: HDD Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
- }
-
- /**
- * This method returns the String for a given MachineKey instance
- *
- * @param key MachineKey instance that needs to be converted to string
- * @return
- */
- private String getKeyInfo(MachineKey key)
- {
- StringBuilder sb = new StringBuilder();
- if (key instanceof MachineKey) {
- MachineKey mkey = (MachineKey)key;
- Integer customer = mkey.getCustomer();
- if (customer != null) {
- sb.append("customer: " + customer + "\n");
- }
- Integer product = mkey.getProduct();
- if (product != null) {
- sb.append("product version: " + product + "\n");
- }
- Integer os = mkey.getOs();
- if (os != null) {
- sb.append("os version: " + os + "\n");
- }
- Integer software1 = mkey.getSoftware1();
- if (software1 != null) {
- sb.append("software1 version: " + software1 + "\n");
- }
- Integer software2 = mkey.getSoftware2();
- if (software2 != null) {
- sb.append("software2 version: " + software2 + "\n");
- }
- Integer software3 = mkey.getSoftware3();
- if (software3 != null) {
- sb.append("software3 version: " + software3 + "\n");
- }
- }
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
deleted file mode 100644
index cb5fa5a..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.demos.machinedata.data.AverageData;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.lib.util.KeyHashValPair;
-
-/**
- * This class calculates the partial sum and count for tuples generated by upstream Operator
- * <p> MachineInfoAveragingPrerequisitesOperator class. </p>
- *
- * @since 0.3.5
- */
-public class MachineInfoAveragingPrerequisitesOperator extends BaseOperator
-{
-
- // Aggregate sum of all values seen for a key.
- private Map<MachineKey, AverageData> sums = new HashMap<>();
-
- public final transient DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>> outputPort = new DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>>()
- {
- @Override
- public Unifier<KeyHashValPair<MachineKey, AverageData>> getUnifier()
- {
- MachineInfoAveragingUnifier unifier = new MachineInfoAveragingUnifier();
- return unifier;
- }
- };
-
- public transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>()
- {
-
- @Override
- public void process(MachineInfo tuple)
- {
- MachineKey key = tuple.getMachineKey();
- AverageData averageData = sums.get(key);
- if (averageData == null) {
- averageData = new AverageData(tuple.getCpu(), tuple.getHdd(), tuple.getRam(), 1);
- sums.put(key, averageData);
- } else {
- averageData.setCpu(averageData.getCpu() + tuple.getCpu());
- averageData.setRam(averageData.getRam() + tuple.getRam());
- averageData.setHdd(averageData.getHdd() + tuple.getHdd());
- averageData.setCount(averageData.getCount() + 1);
- }
- }
- };
-
- @Override
- public void endWindow()
- {
-
- for (Map.Entry<MachineKey, AverageData> entry : sums.entrySet()) {
- if (outputPort.isConnected()) {
- outputPort.emit(new KeyHashValPair<>(entry.getKey(), entry.getValue()));
- }
- }
- sums.clear();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
deleted file mode 100644
index e0b67f3..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator.Unifier;
-
-import com.datatorrent.demos.machinedata.data.AverageData;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.lib.util.KeyHashValPair;
-
-/**
- * This class calculates the partial sum and count for a given key
- * <p>MachineInfoAveragingUnifier class.</p>
- *
- * @since 0.9.0
- */
-public class MachineInfoAveragingUnifier implements Unifier<KeyHashValPair<MachineKey, AverageData>>
-{
-
- private Map<MachineKey, AverageData> sums = new HashMap<>();
- public final transient DefaultOutputPort<KeyHashValPair<MachineKey, AverageData>> outputPort = new DefaultOutputPort<>();
-
- @Override
- public void beginWindow(long arg0)
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void endWindow()
- {
- for (Map.Entry<MachineKey, AverageData> entry : sums.entrySet()) {
- outputPort.emit(new KeyHashValPair<>(entry.getKey(), entry.getValue()));
- }
- sums.clear();
-
- }
-
- @Override
- public void setup(OperatorContext arg0)
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void teardown()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void process(KeyHashValPair<MachineKey, AverageData> arg0)
- {
- MachineKey tupleKey = arg0.getKey();
- AverageData averageData = sums.get(tupleKey);
- AverageData tupleValue = arg0.getValue();
- if (averageData == null) {
- sums.put(tupleKey, tupleValue);
- } else {
- averageData.setCpu(averageData.getCpu() + tupleValue.getCpu());
- averageData.setRam(averageData.getRam() + tupleValue.getRam());
- averageData.setHdd(averageData.getHdd() + tupleValue.getHdd());
- averageData.setCount(averageData.getCount() + tupleValue.getCount());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
deleted file mode 100644
index 6c4256a..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.util;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Generate combinations of elements for the given array of elements.
- *
- * Implements nCr = n! / (r! * (n-r)!)
- *
- * @since 0.3.5
- */
-public class Combinatorics<T>
-{
-
- private T[] values;
- private int size = -1;
- private List<T> result;
- private Map<Integer, List<T>> resultMap = new HashMap<Integer, List<T>>();
- private int resultMapSize = 0;
-
- /**
- * Generates all possible combinations with all the sizes.
- *
- * @param values
- */
- public Combinatorics(T[] values)
- {
- this.values = values;
- this.size = -1;
- this.result = new ArrayList<>();
- }
-
- /**
- * Generates all possible combinations with the given size.
- *
- * @param values
- * @param size
- */
- public Combinatorics(T[] values, int size)
- {
- this.values = values;
- this.size = size;
- this.result = new ArrayList<>();
- }
-
- public Map<Integer, List<T>> generate()
- {
-
- if (size == -1) {
- size = values.length;
- for (int i = 1; i <= size; i++) {
- int[] tmp = new int[i];
- Arrays.fill(tmp, -1);
- generateCombinations(0, 0, tmp);
- }
- } else {
- int[] tmp = new int[size];
- Arrays.fill(tmp, -1);
- generateCombinations(0, 0, tmp);
- }
- return resultMap;
- }
-
- public void generateCombinations(int start, int depth, int[] tmp)
- {
- if (depth == tmp.length) {
- for (int j = 0; j < depth; j++) {
- result.add(values[tmp[j]]);
- }
- resultMap.put(++resultMapSize, result);
- result = new ArrayList<>();
- return;
- }
- for (int i = start; i < values.length; i++) {
- tmp[depth] = i;
- generateCombinations(i + 1, depth + 1, tmp);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
deleted file mode 100644
index f8f2d33..0000000
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata.util;
-
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import com.google.common.collect.Maps;
-
-/**
- * <p>DataTable class.</p>
- *
- * @since 0.3.5
- */
-public class DataTable<R,C,E>
-{
-
- //machineKey, [cpu,ram,hdd] -> value
- private final Map<R,Map<C,E>> table = Maps.newHashMap();
-
- public boolean containsRow(R rowKey)
- {
- return table.containsKey(rowKey);
- }
-
- public void put(R rowKey,C colKey, E entry)
- {
- if (!containsRow(rowKey)) {
- table.put(rowKey, Maps.<C,E>newHashMap());
- }
- table.get(rowKey).put(colKey, entry);
- }
-
- @Nullable
- public E get(R rowKey, C colKey)
- {
- if (!containsRow(rowKey)) {
- return null;
- }
- return table.get(rowKey).get(colKey);
- }
-
- public Set<R> rowKeySet()
- {
- return table.keySet();
- }
-
- public void clear()
- {
- table.clear();
- }
-
- public Map<R,Map<C,E>> getTable()
- {
- return table;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/resources/META-INF/properties.xml b/demos/machinedata/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index afe8783..0000000
--- a/demos/machinedata/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,139 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <property>
- <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
- <value>1000</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Receiver.attr.APPLICATION_WINDOW_COUNT
- </name>
- <value>5</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Receiver.attr.PARTITIONER</name>
- <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.DimensionsGenerator.attr.APPLICATION_WINDOW_COUNT
- </name>
- <value>5</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.stream.Events.locality
- </name>
- <value>CONTAINER_LOCAL</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.DimensionsGenerator.inputport.inputPort.attr.PARTITION_PARALLEL
- </name>
- <value>true</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Aggregator.inputport.inputPort.attr.PARTITION_PARALLEL
- </name>
- <value>true</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Aggregator.attr.APPLICATION_WINDOW_COUNT
- </name>
- <value>5</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.AverageCalculator.attr.APPLICATION_WINDOW_COUNT
- </name>
- <value>5</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Persister.inputport.input.attr.PARTITION_PARALLEL
- </name>
- <value>true</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Persister.attr.APPLICATION_WINDOW_COUNT
- </name>
- <value>5</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Persister.store.dbIndex
- </name>
- <value>2</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Persister.store.host
- </name>
- <value></value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Persister.store.port
- </name>
- <value>6379</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.port.*.attr.QUEUE_CAPACITY</name>
- <value>32000</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Alerter.from
- </name>
- <value></value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Alerter.subject
- </name>
- <value>Alert!!!</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Alerter.recipients.TO
- </name>
- <value></value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Alerter.content
- </name>
- <value>{}</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Alerter.smtpHost
- </name>
- <value>localhost</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Alerter.smtpPort
- </name>
- <value>25</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Alerter.useSsl
- </name>
- <value>false</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.operator.Aggregator.outputport.outputPort.attr.UNIFIER_LIMIT
- </name>
- <value>8</value>
- </property>
- <property>
- <name>dt.application.MachineDataDemo.stream.DimensionalData.locality
- </name>
- <value>THREAD_LOCAL</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/site/conf/my-app-conf1.xml
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/site/conf/my-app-conf1.xml b/demos/machinedata/src/site/conf/my-app-conf1.xml
deleted file mode 100644
index f35873b..0000000
--- a/demos/machinedata/src/site/conf/my-app-conf1.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <property>
- <name>dt.attr.MASTER_MEMORY_MB</name>
- <value>1024</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java b/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
deleted file mode 100644
index 0e397be..0000000
--- a/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.machinedata;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.demos.machinedata.data.ResourceType;
-import com.datatorrent.demos.machinedata.operator.CalculatorOperator;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.KeyValPair;
-import com.datatorrent.lib.util.TimeBucketKey;
-
-/**
- * @since 0.3.5
- */
-public class CalculatorOperatorTest
-{
- private static DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
- private static Logger LOG = LoggerFactory.getLogger(CalculatorOperatorTest.class);
-
- /**
- * Test node logic emits correct results
- */
- @Test
- public void testNodeProcessing() throws Exception
- {
- CalculatorOperator calculatorOperator = new CalculatorOperator();
- calculatorOperator.setup(null);
-
- calculatorOperator.setComputePercentile(true);
- calculatorOperator.setComputeMax(true);
- calculatorOperator.setComputeSD(true);
-
- testPercentile(calculatorOperator);
- }
-
- public void testPercentile(CalculatorOperator oper)
- {
-
- CollectorTestSink sortSink = new CollectorTestSink();
- oper.percentileOutputPort.setSink(sortSink);
- oper.setKthPercentile(50);
- Calendar calendar = Calendar.getInstance();
- Date date = calendar.getTime();
- String timeKey = minuteDateFormat.format(date);
- String day = calendar.get(Calendar.DAY_OF_MONTH) + "";
-
- Integer vs = new Integer(1);
- MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs);
-
- oper.beginWindow(0);
-
- MachineInfo info = new MachineInfo(mk, 1, 1, 1);
- oper.dataPort.process(info);
-
- info.setCpu(2);
- oper.dataPort.process(info);
-
- info.setCpu(3);
- oper.dataPort.process(info);
-
- oper.endWindow();
-
- Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
- for (Object o : sortSink.collectedTuples) {
- LOG.debug(o.toString());
- KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
- Assert.assertEquals("emitted value for 'cpu' was ", 2.0, keyValPair.getValue().get(ResourceType.CPU), 0);
- Assert.assertEquals("emitted value for 'hdd' was ", 1.0, keyValPair.getValue().get(ResourceType.HDD), 0);
- Assert.assertEquals("emitted value for 'ram' was ", 1.0, keyValPair.getValue().get(ResourceType.RAM), 0);
-
- }
- LOG.debug("Done percentile testing\n");
-
- }
-
- public void testStandarDeviation(CalculatorOperator oper)
- {
- CollectorTestSink sortSink = new CollectorTestSink();
- oper.sdOutputPort.setSink(sortSink);
- Calendar calendar = Calendar.getInstance();
- Date date = calendar.getTime();
- String timeKey = minuteDateFormat.format(date);
- String day = calendar.get(Calendar.DAY_OF_MONTH) + "";
-
- Integer vs = new Integer(1);
- MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs);
-
- oper.beginWindow(0);
-
- MachineInfo info = new MachineInfo(mk, 1, 1, 1);
- oper.dataPort.process(info);
-
- info.setCpu(2);
- oper.dataPort.process(info);
-
- info.setCpu(3);
- oper.dataPort.process(info);
-
- oper.endWindow();
-
- Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
- for (Object o : sortSink.collectedTuples) {
- LOG.debug(o.toString());
- KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
- Assert.assertEquals("emitted value for 'cpu' was ", getSD(ImmutableList.of(1, 2, 3)), keyValPair.getValue().get(ResourceType.CPU), 0);
- Assert.assertEquals("emitted value for 'hdd' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.HDD), 0);
- Assert.assertEquals("emitted value for 'ram' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.RAM), 0);
-
- }
- LOG.debug("Done sd testing\n");
-
- }
-
- private final double getSD(List<Integer> input)
- {
- int sum = 0;
- for (int i : input) {
- sum += i;
- }
- double avg = sum / (input.size() * 1.0);
- double sd = 0;
- for (Integer point : input) {
- sd += Math.pow(point - avg, 2);
- }
- return Math.sqrt(sd);
- }
-
- public void testMax(CalculatorOperator oper)
- {
- CollectorTestSink sortSink = new CollectorTestSink();
- oper.maxOutputPort.setSink(sortSink);
- Calendar calendar = Calendar.getInstance();
- Date date = calendar.getTime();
- String timeKey = minuteDateFormat.format(date);
- String day = calendar.get(Calendar.DAY_OF_MONTH) + "";
-
- Integer vs = new Integer(1);
- MachineKey mk = new MachineKey(timeKey, day, vs, vs, vs, vs, vs, vs, vs);
-
- oper.beginWindow(0);
-
- MachineInfo info = new MachineInfo(mk, 1, 1, 1);
- oper.dataPort.process(info);
-
- info.setCpu(2);
- oper.dataPort.process(info);
-
- info.setCpu(3);
- oper.dataPort.process(info);
-
- oper.endWindow();
-
- Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
- for (Object o : sortSink.collectedTuples) {
- LOG.debug(o.toString());
- KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
- Assert.assertEquals("emitted value for 'cpu' was ", 3, keyValPair.getValue().get(ResourceType.CPU), 0);
- Assert.assertEquals("emitted value for 'hdd' was ", 1, keyValPair.getValue().get(ResourceType.HDD), 0);
- Assert.assertEquals("emitted value for 'ram' was ", 1, keyValPair.getValue().get(ResourceType.RAM), 0);
-
- }
- LOG.debug("Done max testing\n");
-
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/machinedata/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/test/resources/log4j.properties b/demos/machinedata/src/test/resources/log4j.properties
deleted file mode 100644
index cf0d19e..0000000
--- a/demos/machinedata/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=DEBUG
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=debug
-log4j.logger.org.apache.apex=debug