You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2016/09/15 13:37:51 UTC
incubator-metron git commit: METRON-411 Support Greater Range of
Profile Periods (nickwallen) closes apache/incubator-metron#246
Repository: incubator-metron
Updated Branches:
refs/heads/master 5e86bc3dd -> baf0d24a4
METRON-411 Support Greater Range of Profile Periods (nickwallen) closes apache/incubator-metron#246
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/baf0d24a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/baf0d24a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/baf0d24a
Branch: refs/heads/master
Commit: baf0d24a45e865ec3aafc128d657e0a7b69260af
Parents: 5e86bc3
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Sep 15 09:36:49 2016 -0400
Committer: Nick Allen <ni...@nickallen.org>
Committed: Thu Sep 15 09:36:49 2016 -0400
----------------------------------------------------------------------
.../metron/profiler/client/GetProfileTest.java | 17 +-
.../client/HBaseProfilerClientTest.java | 16 +-
.../metron/profiler/client/ProfileWriter.java | 6 +-
.../src/main/config/profiler.properties | 3 +-
.../src/main/flux/profiler/remote.yaml | 10 +-
.../metron/profiler/ProfileMeasurement.java | 33 +-
.../apache/metron/profiler/ProfilePeriod.java | 150 ++------
.../profiler/bolt/ProfileBuilderBolt.java | 24 +-
.../metron/profiler/hbase/RowKeyBuilder.java | 14 +-
.../profiler/hbase/SaltyRowKeyBuilder.java | 31 +-
.../metron/profiler/ProfilePeriodTest.java | 343 ++++---------------
.../profiler/bolt/ProfileBuilderBoltTest.java | 3 +-
.../profiler/bolt/ProfileHBaseMapperTest.java | 3 +-
.../profiler/hbase/SaltyRowKeyBuilderTest.java | 51 +--
.../integration/ProfilerIntegrationTest.java | 3 +-
15 files changed, 192 insertions(+), 515 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
index e68d52d..dbecca0 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
@@ -114,6 +114,8 @@ public class GetProfileTest {
*/
@Test
public void testWithNoGroups() {
+ final long periodDuration = 15;
+ final TimeUnit periodUnits = TimeUnit.MINUTES;
final int periodsPerHour = 4;
final int expectedValue = 2302;
final int hours = 2;
@@ -122,7 +124,7 @@ public class GetProfileTest {
// setup - write some measurements to be read later
final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
profileWriter.write(m, count, group, val -> expectedValue);
// execute - read the profile values - no groups
@@ -138,6 +140,8 @@ public class GetProfileTest {
*/
@Test
public void testWithOneGroup() {
+ final long periodDuration = 15;
+ final TimeUnit periodUnits = TimeUnit.MINUTES;
final int periodsPerHour = 4;
final int expectedValue = 2302;
final int hours = 2;
@@ -146,7 +150,7 @@ public class GetProfileTest {
// setup - write some measurements to be read later
final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
profileWriter.write(m, count, group, val -> expectedValue);
// create a variable that contains the groups to use
@@ -165,6 +169,8 @@ public class GetProfileTest {
*/
@Test
public void testWithTwoGroups() {
+ final long periodDuration = 15;
+ final TimeUnit periodUnits = TimeUnit.MINUTES;
final int periodsPerHour = 4;
final int expectedValue = 2302;
final int hours = 2;
@@ -173,7 +179,7 @@ public class GetProfileTest {
// setup - write some measurements to be read later
final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
profileWriter.write(m, count, group, val -> expectedValue);
// create a variable that contains the groups to use
@@ -211,14 +217,15 @@ public class GetProfileTest {
*/
@Test
public void testOutsideTimeHorizon() {
- final int periodsPerHour = 4;
+ final long periodDuration = 15;
+ final TimeUnit periodUnits = TimeUnit.MINUTES;
final int expectedValue = 2302;
final int hours = 2;
final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
final List<Object> group = Collections.emptyList();
// setup - write a single value from 2 hours ago
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
profileWriter.write(m, 1, group, val -> expectedValue);
// create a variable that contains the groups to use
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
index 087bffa..0da3f31 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
@@ -104,14 +104,18 @@ public class HBaseProfilerClientTest {
*/
@Test
public void testFetchOneGroup() throws Exception {
+
+ // setup
+ final long periodDuration = 15;
+ final TimeUnit periodUnits = TimeUnit.MINUTES;
final int periodsPerHour = 4;
final int expectedValue = 2302;
final int hours = 2;
final int count = hours * periodsPerHour;
final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
- // setup - write two groups of measurements - 'weekends' and 'weekdays'
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ // create two groups of measurements - one on weekdays and one on weekends
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
@@ -130,6 +134,8 @@ public class HBaseProfilerClientTest {
public void testFetchNoGroup() {
// setup
+ final long periodDuration = 15;
+ final TimeUnit periodUnits = TimeUnit.MINUTES;
final int periodsPerHour = 4;
final int expectedValue = 2302;
final int hours = 2;
@@ -137,7 +143,7 @@ public class HBaseProfilerClientTest {
final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
// create two groups of measurements - one on weekdays and one on weekends
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
@@ -155,6 +161,8 @@ public class HBaseProfilerClientTest {
*/
@Test
public void testFetchOutsideTimeWindow() throws Exception {
+ final long periodDuration = 15;
+ final TimeUnit periodUnits = TimeUnit.MINUTES;
final int periodsPerHour = 4;
final int hours = 2;
int numberToWrite = hours * periodsPerHour;
@@ -162,7 +170,7 @@ public class HBaseProfilerClientTest {
final long startTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
// setup - write some values to read later
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
profileWriter.write(m, numberToWrite, group, val -> 1000);
// execute
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
index 95be44c..db2f8da 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -31,6 +31,7 @@ import org.apache.metron.profiler.hbase.RowKeyBuilder;
import org.apache.storm.hbase.common.ColumnList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
@@ -68,8 +69,9 @@ public class ProfileWriter {
m = new ProfileMeasurement(
prototype.getProfileName(),
prototype.getEntity(),
- next.getTimeInMillis(),
- prototype.getPeriodsPerHour());
+ next.getStartTimeMillis(),
+ prototype.getPeriod().getDurationMillis(),
+ TimeUnit.MILLISECONDS);
// generate the next value that should be written
Object nextValue = valueGenerator.apply(m.getValue());
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler/src/main/config/profiler.properties
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/config/profiler.properties b/metron-analytics/metron-profiler/src/main/config/profiler.properties
index 4a9cdb4..d2b8544 100644
--- a/metron-analytics/metron-profiler/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties
@@ -23,7 +23,8 @@
profiler.workers=1
profiler.executors=0
profiler.input.topic=indexing
-profiler.periods.per.hour=4
+profiler.period.duration=15
+profiler.period.duration.units=MINUTES
profiler.hbase.salt.divisor=1000
profiler.hbase.table=profiler
profiler.hbase.column.family=P
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 835c609..db97392 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -30,8 +30,9 @@ components:
properties:
- name: "saltDivisor"
value: ${profiler.hbase.salt.divisor}
- - name: "periodsPerHour"
- value: ${profiler.periods.per.hour}
+ configMethods:
+ - name: "withPeriodDuration"
+ args: [${profiler.period.duration}, "${profiler.period.duration.units}"]
- id: "columnBuilder"
className: "org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder"
@@ -93,8 +94,9 @@ bolts:
properties:
- name: "executor"
ref: "defaultExecutor"
- - name: "periodsPerHour"
- value: ${profiler.periods.per.hour}
+ configMethods:
+ - name: "withPeriodDuration"
+ args: [${profiler.period.duration}, "${profiler.period.duration.units}"]
- id: "hbaseBolt"
className: "org.apache.metron.hbase.bolt.HBaseBolt"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index 0c94879..210b92f 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -21,6 +21,7 @@
package org.apache.metron.profiler;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
* Represents a single data point within a Profile.
@@ -51,26 +52,21 @@ public class ProfileMeasurement {
private List<String> groupBy;
/**
- * The number of profile periods per hour.
- */
- private int periodsPerHour;
-
- /**
* The period in which the ProfileMeasurement was taken.
*/
private ProfilePeriod period;
/**
* @param profileName The name of the profile.
- * @param entity The name of the entity.
- * @param epochMillis The timestamp when the measurement period has been started in milliseconds since the epoch.
- * @param periodsPerHour The number of profile periods per hour.
+ * @param entity The name of the entity being profiled.
+ * @param whenMillis When the measurement was taken in epoch milliseconds.
+ * @param periodDuration The duration of each profile period.
+ * @param periodUnits The units of the duration of each profile period.
*/
- public ProfileMeasurement(String profileName, String entity, long epochMillis, int periodsPerHour) {
+ public ProfileMeasurement(String profileName, String entity, long whenMillis, long periodDuration, TimeUnit periodUnits) {
this.profileName = profileName;
this.entity = entity;
- this.periodsPerHour = periodsPerHour;
- this.period = new ProfilePeriod(epochMillis, periodsPerHour);
+ this.period = new ProfilePeriod(whenMillis, periodDuration, periodUnits);
}
public String getProfileName() {
@@ -85,10 +81,6 @@ public class ProfileMeasurement {
return value;
}
- public void setValue(Object value) {
- this.value = value;
- }
-
public ProfilePeriod getPeriod() {
return period;
}
@@ -97,12 +89,12 @@ public class ProfileMeasurement {
return groupBy;
}
- public void setGroupBy(List<String> groupBy) {
- this.groupBy = groupBy;
+ public void setValue(Object value) {
+ this.value = value;
}
- public int getPeriodsPerHour() {
- return periodsPerHour;
+ public void setGroupBy(List<String> groupBy) {
+ this.groupBy = groupBy;
}
@Override
@@ -111,7 +103,6 @@ public class ProfileMeasurement {
if (o == null || getClass() != o.getClass()) return false;
ProfileMeasurement that = (ProfileMeasurement) o;
- if (periodsPerHour != that.periodsPerHour) return false;
if (profileName != null ? !profileName.equals(that.profileName) : that.profileName != null) return false;
if (entity != null ? !entity.equals(that.entity) : that.entity != null) return false;
if (value != null ? !value.equals(that.value) : that.value != null) return false;
@@ -126,7 +117,6 @@ public class ProfileMeasurement {
result = 31 * result + (entity != null ? entity.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
result = 31 * result + (groupBy != null ? groupBy.hashCode() : 0);
- result = 31 * result + periodsPerHour;
result = 31 * result + (period != null ? period.hashCode() : 0);
return result;
}
@@ -138,7 +128,6 @@ public class ProfileMeasurement {
", entity='" + entity + '\'' +
", value=" + value +
", groupBy=" + groupBy +
- ", periodsPerHour=" + periodsPerHour +
", period=" + period +
'}';
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfilePeriod.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
index 648eb28..1b8efc8 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
@@ -20,12 +20,8 @@
package org.apache.metron.profiler;
-import java.util.Calendar;
-import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
-import static java.lang.String.format;
-
/**
* The Profiler captures a ProfileMeasurement once every ProfilePeriod. There can be
* multiple ProfilePeriods every hour.
@@ -33,133 +29,47 @@ import static java.lang.String.format;
public class ProfilePeriod {
/**
- * The year.
- */
- private int year;
-
- /**
- * Day of the year; [1, 366]
+ * A monotonically increasing number identifying the period. The first period is 0
+ * and began at the epoch.
*/
- private int dayOfYear;
+ private long period;
/**
- * Hour of the day; [0, 23]
+ * The duration of each period in milliseconds.
*/
- private int hour;
-
- /**
- * The period within the hour; [0, periodsPerHour)
- */
- private int period;
-
- /**
- * The number of periods per hour. This value must be a divisor or multiple
- * of 60; 1, 2, 4, 6, 240, etc.
- */
- private int periodsPerHour;
-
- /**
- * The actual time used to initialize the ProfilePeriod. This value should not be
- * used for anything other than troubleshooting.
- */
- private long epochMillis;
+ private long durationMillis;
/**
* @param epochMillis A timestamp contained somewhere within the profile period.
- * @param periodsPerHour The number of periods per hour. Must be a divisor or multiple
- * of 60; 1, 2, 4, 6, 240, etc.
+ * @param duration The duration of each profile period.
+ * @param units The units of the duration; hours, minutes, etc.
*/
- public ProfilePeriod(long epochMillis, int periodsPerHour) {
-
- // periods per hour must be a divisor or multiple of 60
- if(60 % periodsPerHour != 0 && periodsPerHour % 60 != 0) {
- throw new RuntimeException(format("invalid periodsPerHour: expected=divisor/multiple of 60, actual=%d", periodsPerHour));
- }
-
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- cal.setTimeInMillis(epochMillis);
-
- this.periodsPerHour = periodsPerHour;
- this.period = findPeriod(cal.get(Calendar.MINUTE), cal.get(Calendar.SECOND), periodsPerHour);
- this.hour = cal.get(Calendar.HOUR_OF_DAY);
- this.dayOfYear = cal.get(Calendar.DAY_OF_YEAR);
- this.year = cal.get(Calendar.YEAR);
- this.epochMillis = epochMillis;
+ public ProfilePeriod(long epochMillis, long duration, TimeUnit units) {
+ this.durationMillis = units.toMillis(duration);
+ this.period = epochMillis / durationMillis;
}
/**
- * Returns the next ProfilePeriod in time.
+ * When this period started in milliseconds since the epoch.
*/
- public ProfilePeriod next() {
- long nextMillis = this.getTimeInMillis() + millisPerPeriod(periodsPerHour);
- return new ProfilePeriod(nextMillis, periodsPerHour);
+ public long getStartTimeMillis() {
+ return period * durationMillis;
}
/**
- * @return The time in milliseconds since the epoch.
+ * Returns the next ProfilePeriod in time.
*/
- public long getTimeInMillis() {
-
- int millisPastHour = (int) millisPerPeriod(periodsPerHour) * period;
-
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- cal.set(Calendar.YEAR, year);
- cal.set(Calendar.DAY_OF_YEAR, dayOfYear);
- cal.set(Calendar.HOUR_OF_DAY, hour);
- cal.set(Calendar.MINUTE, (millisPastHour / 1000) / 60);
- cal.set(Calendar.SECOND, (millisPastHour / 1000) % 60);
- cal.set(Calendar.MILLISECOND, 0);
-
- return cal.getTimeInMillis();
- }
-
- public int getYear() {
- return year;
- }
-
- public int getDayOfYear() {
- return dayOfYear;
- }
-
- public int getHour() {
- return hour;
+ public ProfilePeriod next() {
+ long nextStart = getStartTimeMillis() + durationMillis;
+ return new ProfilePeriod(nextStart, durationMillis, TimeUnit.MILLISECONDS);
}
- public int getPeriod() {
+ public long getPeriod() {
return period;
}
- public int getPeriodsPerHour() {
- return periodsPerHour;
- }
-
- /**
- * Determines the period within the hour based on the minutes/seconds on the clock.
- *
- * @param minutes The minute within the hour; 0-59.
- * @param seconds The second within the minute; 0-59.
- * @return The period within the hour.
- */
- private static int findPeriod(int minutes, int seconds, int periodsPerHour) {
- final int secondsInHour = minutes * 60 + seconds;
- return (int) (secondsInHour / secondsPerPeriod(periodsPerHour));
- }
-
- /**
- * The number of seconds in each period.
- * @param periodsPerHour The number of periods per hour.
- */
- private static double secondsPerPeriod(int periodsPerHour) {
- return millisPerPeriod(periodsPerHour) / 1000L;
- }
-
- /**
- * The number of milliseconds in each period.
- * @param periodsPerHour The number of periods per hour.
- */
- private static long millisPerPeriod(int periodsPerHour) {
- final long millisPerHour = 60L * 60L * 1000L;
- return millisPerHour / periodsPerHour;
+ public long getDurationMillis() {
+ return durationMillis;
}
@Override
@@ -168,32 +78,22 @@ public class ProfilePeriod {
if (o == null || getClass() != o.getClass()) return false;
ProfilePeriod that = (ProfilePeriod) o;
-
- if (year != that.year) return false;
- if (dayOfYear != that.dayOfYear) return false;
- if (hour != that.hour) return false;
if (period != that.period) return false;
- return periodsPerHour == that.periodsPerHour;
+ return durationMillis == that.durationMillis;
}
@Override
public int hashCode() {
- int result = year;
- result = 31 * result + dayOfYear;
- result = 31 * result + hour;
- result = 31 * result + period;
- result = 31 * result + periodsPerHour;
+ int result = (int) (period ^ (period >>> 32));
+ result = 31 * result + (int) (durationMillis ^ (durationMillis >>> 32));
return result;
}
@Override
public String toString() {
return "ProfilePeriod{" +
- "year=" + year +
- ", dayOfYear=" + dayOfYear +
- ", hour=" + hour +
- ", period=" + period +
- ", periodsPerHour=" + periodsPerHour +
+ "period=" + period +
+ ", durationMillis=" + durationMillis +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 2cd7182..de8561b 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -41,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
@@ -64,10 +65,9 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
private StellarExecutor executor;
/**
- * The number of times per hour that a profile is flushed and a measurement
- * is written. This should be a divisor or multiple of 60; 1, 2, 3, 4, 6, 240, etc.
+ * The duration of each profile period in milliseconds.
*/
- private int periodsPerHour;
+ private long periodDurationMillis;
/**
* A ProfileMeasurement is created and emitted each window period. A Profile
@@ -100,12 +100,9 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
*/
@Override
public Map<String, Object> getComponentConfiguration() {
- Config conf = new Config();
-
// how frequently should the bolt receive tick tuples?
- long freqInSeconds = ((60 * 60) / periodsPerHour);
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, freqInSeconds);
-
+ Config conf = new Config();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, TimeUnit.MILLISECONDS.toSeconds(periodDurationMillis));
return conf;
}
@@ -186,7 +183,8 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
profileConfig.getProfile(),
input.getStringByField("entity"),
getTimestamp(),
- periodsPerHour);
+ periodDurationMillis,
+ TimeUnit.MILLISECONDS);
// execute the 'init' expression
try {
@@ -294,9 +292,11 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
this.executor = executor;
}
- public void setPeriodsPerHour(int periodsPerHour) {
- this.periodsPerHour = periodsPerHour;
+ public void setPeriodDurationMillis(long periodDurationMillis) {
+ this.periodDurationMillis = periodDurationMillis;
}
-
+ public void withPeriodDuration(int duration, TimeUnit units) {
+ setPeriodDurationMillis(units.toMillis(duration));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java
index bb8cc80..5223ee6 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java
@@ -34,11 +34,11 @@ public interface RowKeyBuilder extends Serializable {
/**
* Build a row key for a given ProfileMeasurement.
- *
+ * <p>
* This method is useful when writing ProfileMeasurements to HBase.
*
* @param measurement The profile measurement.
- * @param groups The groups used to sort the profile data.
+ * @param groups The groups used to sort the profile data.
* @return The HBase row key.
*/
byte[] rowKey(ProfileMeasurement measurement, List<Object> groups);
@@ -46,14 +46,14 @@ public interface RowKeyBuilder extends Serializable {
/**
* Builds a list of row keys necessary to retrieve a profile's measurements over
* a time horizon.
- *
+ * <p>
* This method is useful when attempting to read ProfileMeasurements stored in HBase.
*
- * @param profile The name of the profile.
- * @param entity The name of the entity.
- * @param groups The group(s) used to sort the profile data.
+ * @param profile The name of the profile.
+ * @param entity The name of the entity.
+ * @param groups The group(s) used to sort the profile data.
* @param durationAgo How long ago?
- * @param unit The time units of how long ago.
+ * @param unit The time units of how long ago.
* @return All of the row keys necessary to retrieve the profile measurements.
*/
List<byte[]> rowKeys(String profile, String entity, List<Object> groups, long durationAgo, TimeUnit unit);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
index e78a8ea..d1c48f8 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
@@ -56,19 +56,18 @@ public class SaltyRowKeyBuilder implements RowKeyBuilder {
private int saltDivisor;
/**
- * An hour is divided into multiple periods. This defines how many periods
- * will exist within each given hour.
+ * The duration of each profile period in milliseconds.
*/
- private int periodsPerHour;
+ private long periodDurationMillis;
public SaltyRowKeyBuilder() {
this.saltDivisor = 1000;
- this.periodsPerHour = 4;
+ this.periodDurationMillis = TimeUnit.MINUTES.toMillis(15);
}
- public SaltyRowKeyBuilder(int saltDivisor, int periodsPerHour) {
+ public SaltyRowKeyBuilder(int saltDivisor, long duration, TimeUnit units) {
this.saltDivisor = saltDivisor;
- this.periodsPerHour = periodsPerHour;
+ this.periodDurationMillis = units.toMillis(duration);
}
/**
@@ -91,8 +90,8 @@ public class SaltyRowKeyBuilder implements RowKeyBuilder {
long startTime = endTime - unit.toMillis(durationAgo);
// find the starting period and advance until the end time is reached
- ProfilePeriod period = new ProfilePeriod(startTime, periodsPerHour);
- while(period.getTimeInMillis() <= endTime) {
+ ProfilePeriod period = new ProfilePeriod(startTime, periodDurationMillis, TimeUnit.MILLISECONDS);
+ while(period.getStartTimeMillis() <= endTime) {
byte[] k = rowKey(profile, entity, period, groups);
rowKeys.add(k);
@@ -115,6 +114,14 @@ public class SaltyRowKeyBuilder implements RowKeyBuilder {
return rowKey(m.getProfileName(), m.getEntity(), m.getPeriod(), groups);
}
+ public void withPeriodDuration(long duration, TimeUnit units) {
+ periodDurationMillis = units.toMillis(duration);
+ }
+
+ public void setSaltDivisor(int saltDivisor) {
+ this.saltDivisor = saltDivisor;
+ }
+
/**
* Build the row key.
* @param profile The name of the profile.
@@ -176,11 +183,8 @@ public class SaltyRowKeyBuilder implements RowKeyBuilder {
*/
private static byte[] timeKey(ProfilePeriod period) {
return ByteBuffer
- .allocate(4 * Integer.BYTES)
- .putInt(period.getYear())
- .putInt(period.getDayOfYear())
- .putInt(period.getHour())
- .putInt(period.getPeriod())
+ .allocate(Long.BYTES)
+ .putLong(period.getPeriod())
.array();
}
@@ -207,5 +211,4 @@ public class SaltyRowKeyBuilder implements RowKeyBuilder {
}
}
- private static final double MINS_PER_HOUR = 60.0;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
index ada7365..7e05890 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
@@ -22,6 +22,9 @@ package org.apache.metron.profiler;
import org.junit.Test;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
import static org.junit.Assert.assertEquals;
/**
@@ -30,312 +33,88 @@ import static org.junit.Assert.assertEquals;
public class ProfilePeriodTest {
/**
- * Thu, Aug 25 2016 09:27:10 EST
* Thu, Aug 25 2016 13:27:10 GMT
- *
- * 238th day of the year
*/
private long AUG2016 = 1472131630748L;
- /**
- * The number of periods per hour must always ensure that the first period falls on the start of each hour. This
- * means that the number of periods must be a divisor or multiple of 60.
- */
- @Test(expected = RuntimeException.class)
- public void testInvalidPeriodsPerHour() {
- new ProfilePeriod(AUG2016, 241);
- }
-
@Test
- public void test1PeriodPerHour() {
- ProfilePeriod period = new ProfilePeriod(AUG2016, 1);
+ public void testFirstPeriodAtEpoch() {
+ long duration = 1;
+ TimeUnit units = TimeUnit.HOURS;
- assertEquals(2016, period.getYear());
- assertEquals(238, period.getDayOfYear());
- assertEquals(13, period.getHour());
+ ProfilePeriod period = new ProfilePeriod(0, duration, units);
assertEquals(0, period.getPeriod());
+ assertEquals(0, period.getStartTimeMillis());
+ assertEquals(units.toMillis(duration), period.getDurationMillis());
}
@Test
- public void test2PeriodsPerHour() {
- ProfilePeriod period = new ProfilePeriod(AUG2016, 2);
-
- assertEquals(2016, period.getYear());
- assertEquals(238, period.getDayOfYear());
- assertEquals(13, period.getHour());
- assertEquals(0, period.getPeriod());
+ public void testOneMinutePeriods() {
+ long duration = 1;
+ TimeUnit units = TimeUnit.MINUTES;
+
+ ProfilePeriod period = new ProfilePeriod(AUG2016, duration, units);
+ assertEquals(24535527, period.getPeriod());
+ assertEquals(1472131620000L, period.getStartTimeMillis()); // Thu, 25 Aug 2016 13:27:00 GMT
+ assertEquals(units.toMillis(duration), period.getDurationMillis());
}
@Test
- public void test3PeriodsPerHour() {
- ProfilePeriod period = new ProfilePeriod(AUG2016, 3);
-
- assertEquals(2016, period.getYear());
- assertEquals(238, period.getDayOfYear());
- assertEquals(13, period.getHour());
- assertEquals(1, period.getPeriod());
+ public void testFifteenMinutePeriods() {
+ long duration = 15;
+ TimeUnit units = TimeUnit.MINUTES;
+
+ ProfilePeriod period = new ProfilePeriod(AUG2016, duration, units);
+ assertEquals(1635701, period.getPeriod());
+ assertEquals(1472130900000L, period.getStartTimeMillis()); // Thu, 25 Aug 2016 13:15:00 GMT
+ assertEquals(units.toMillis(duration), period.getDurationMillis());
}
@Test
- public void test4PeriodsPerHour() {
- ProfilePeriod period = new ProfilePeriod(AUG2016, 4);
-
- assertEquals(2016, period.getYear());
- assertEquals(238, period.getDayOfYear());
- assertEquals(13, period.getHour());
- assertEquals(1, period.getPeriod());
+ public void testOneHourPeriods() {
+ long duration = 1;
+ TimeUnit units = TimeUnit.HOURS;
+
+ ProfilePeriod period = new ProfilePeriod(AUG2016, duration, units);
+ assertEquals(408925, period.getPeriod());
+ assertEquals(1472130000000L, period.getStartTimeMillis()); // Thu, 25 Aug 2016 13:00:00 GMT
+ assertEquals(units.toMillis(duration), period.getDurationMillis());
}
@Test
- public void test60PeriodsPerHour() {
- ProfilePeriod period = new ProfilePeriod(AUG2016, 60);
-
- assertEquals(2016, period.getYear());
- assertEquals(238, period.getDayOfYear());
- assertEquals(13, period.getHour());
- assertEquals(27, period.getPeriod());
- }
-
- @Test
- public void test240PeriodsPerHour() {
- ProfilePeriod period = new ProfilePeriod(AUG2016, 240);
-
- assertEquals(2016, period.getYear());
- assertEquals(238, period.getDayOfYear());
- assertEquals(13, period.getHour());
- assertEquals(108, period.getPeriod());
+ public void testTwoHourPeriods() {
+ long duration = 2;
+ TimeUnit units = TimeUnit.HOURS;
+
+ ProfilePeriod period = new ProfilePeriod(AUG2016, duration, units);
+ assertEquals(204462, period.getPeriod());
+ assertEquals(1472126400000L, period.getStartTimeMillis()); // Thu, 25 Aug 2016 12:00:00 GMT
+ assertEquals(units.toMillis(duration), period.getDurationMillis());
}
-
@Test
- public void testNextWith2PeriodsPerHour() {
- int periodsPerHour = 2;
-
- ProfilePeriod first = new ProfilePeriod(AUG2016, periodsPerHour);
- assertEquals(2016, first.getYear());
- assertEquals(238, first.getDayOfYear());
- assertEquals(13, first.getHour());
- assertEquals(0, first.getPeriod());
-
- // find the next period
- ProfilePeriod second = first.next();
- assertEquals(2016, second.getYear());
- assertEquals(238, second.getDayOfYear());
- assertEquals(13, second.getHour());
- assertEquals(1, second.getPeriod());
- assertEquals(periodsPerHour, second.getPeriodsPerHour());
-
- // find the next period
- ProfilePeriod third = second.next();
- assertEquals(2016, third.getYear());
- assertEquals(238, third.getDayOfYear());
- assertEquals(14, third.getHour());
- assertEquals(0, third.getPeriod());
- assertEquals(periodsPerHour, third.getPeriodsPerHour());
-
- // find the next period
- ProfilePeriod fourth = third.next();
- assertEquals(2016, fourth.getYear());
- assertEquals(238, fourth.getDayOfYear());
- assertEquals(14, fourth.getHour());
- assertEquals(1, fourth.getPeriod());
- assertEquals(periodsPerHour, fourth.getPeriodsPerHour());
-
- // find the next period
- ProfilePeriod fifth = fourth.next();
- assertEquals(2016, fifth.getYear());
- assertEquals(238, fifth.getDayOfYear());
- assertEquals(15, fifth.getHour());
- assertEquals(0, fifth.getPeriod());
- assertEquals(periodsPerHour, fifth.getPeriodsPerHour());
-
- // find the next period
- ProfilePeriod sixth = fifth.next();
- assertEquals(2016, sixth.getYear());
- assertEquals(238, sixth.getDayOfYear());
- assertEquals(15, sixth.getHour());
- assertEquals(1, sixth.getPeriod());
- assertEquals(periodsPerHour, sixth.getPeriodsPerHour());
-
- // find the next period
- ProfilePeriod seventh = sixth.next();
- assertEquals(2016, seventh.getYear());
- assertEquals(238, seventh.getDayOfYear());
- assertEquals(16, seventh.getHour());
- assertEquals(0, seventh.getPeriod());
- assertEquals(periodsPerHour, seventh.getPeriodsPerHour());
+ public void testEightHourPeriods() {
+ long duration = 8;
+ TimeUnit units = TimeUnit.HOURS;
+
+ ProfilePeriod period = new ProfilePeriod(AUG2016, duration, units);
+ assertEquals(51115, period.getPeriod());
+ assertEquals(1472112000000L, period.getStartTimeMillis()); // Thu, 25 Aug 2016 08:00:00 GMT
+ assertEquals(units.toMillis(duration), period.getDurationMillis());
}
@Test
- public void testNextWith4PeriodsPerHour() {
- int periodsPerHour = 4;
-
- ProfilePeriod first = new ProfilePeriod(AUG2016, periodsPerHour);
- assertEquals(2016, first.getYear());
- assertEquals(238, first.getDayOfYear());
- assertEquals(13, first.getHour());
- assertEquals(1, first.getPeriod());
-
- // find the next period
- ProfilePeriod second = first.next();
- assertEquals(2016, second.getYear());
- assertEquals(238, second.getDayOfYear());
- assertEquals(13, second.getHour());
- assertEquals(2, second.getPeriod());
- assertEquals(periodsPerHour, second.getPeriodsPerHour());
-
- // find the next period
- ProfilePeriod third = second.next();
- assertEquals(2016, third.getYear());
- assertEquals(238, third.getDayOfYear());
- assertEquals(13, third.getHour());
- assertEquals(3, third.getPeriod());
- assertEquals(periodsPerHour, third.getPeriodsPerHour());
-
- // find the next period
- ProfilePeriod fourth = third.next();
- assertEquals(2016, fourth.getYear());
- assertEquals(238, fourth.getDayOfYear());
- assertEquals(14, fourth.getHour());
- assertEquals(0, fourth.getPeriod());
- assertEquals(periodsPerHour, fourth.getPeriodsPerHour());
+ public void testNextWithFifteenMinutePeriods() {
+ long duration = 15;
+ TimeUnit units = TimeUnit.MINUTES;
+
+ ProfilePeriod previous = new ProfilePeriod(AUG2016, duration, units);
+ IntStream.range(0, 100).forEach(i -> {
+
+ ProfilePeriod next = previous.next();
+ assertEquals(previous.getPeriod() + 1, next.getPeriod());
+ assertEquals(previous.getStartTimeMillis() + previous.getDurationMillis(), next.getStartTimeMillis());
+ assertEquals(previous.getDurationMillis(), next.getDurationMillis());
+ });
}
-
- @Test
- public void testNextWith10PeriodsPerHour() {
- int periodsPerHour = 10;
-
- ProfilePeriod first = new ProfilePeriod(AUG2016, periodsPerHour);
- assertEquals(2016, first.getYear());
- assertEquals(238, first.getDayOfYear());
- assertEquals(13, first.getHour());
- assertEquals(4, first.getPeriod());
-
- // find the next period
- ProfilePeriod second = first.next();
- assertEquals(2016, second.getYear());
- assertEquals(238, second.getDayOfYear());
- assertEquals(13, second.getHour());
- assertEquals(5, second.getPeriod());
- assertEquals(periodsPerHour, second.getPeriodsPerHour());
-
- // find the next period
- ProfilePeriod third = second.next();
- assertEquals(2016, third.getYear());
- assertEquals(238, third.getDayOfYear());
- assertEquals(13, third.getHour());
- assertEquals(6, third.getPeriod());
- assertEquals(periodsPerHour, third.getPeriodsPerHour());
-
- // find the next period
- ProfilePeriod fourth = third.next();
- assertEquals(2016, fourth.getYear());
- assertEquals(238, fourth.getDayOfYear());
- assertEquals(13, fourth.getHour());
- assertEquals(7, fourth.getPeriod());
- assertEquals(periodsPerHour, fourth.getPeriodsPerHour());
-
- // find the next period
- ProfilePeriod fifth = fourth.next();
- assertEquals(2016, fifth.getYear());
- assertEquals(238, fifth.getDayOfYear());
- assertEquals(13, fifth.getHour());
- assertEquals(8, fifth.getPeriod());
- assertEquals(periodsPerHour, fifth.getPeriodsPerHour());
-
- // find the next period
- ProfilePeriod sixth = fifth.next();
- assertEquals(2016, sixth.getYear());
- assertEquals(238, sixth.getDayOfYear());
- assertEquals(13, sixth.getHour());
- assertEquals(9, sixth.getPeriod());
- assertEquals(periodsPerHour, sixth.getPeriodsPerHour());
-
- // find the next period
- ProfilePeriod seventh = sixth.next();
- assertEquals(2016, seventh.getYear());
- assertEquals(238, seventh.getDayOfYear());
- assertEquals(14, seventh.getHour());
- assertEquals(0, seventh.getPeriod());
- assertEquals(periodsPerHour, seventh.getPeriodsPerHour());
- }
-
- @Test
- public void testNextWith240PeriodsPerHour() {
- final int periodsPerHour = 240;
-
- ProfilePeriod p = new ProfilePeriod(AUG2016, periodsPerHour);
- assertEquals(2016, p.getYear());
- assertEquals(238, p.getDayOfYear());
- assertEquals(13, p.getHour());
- assertEquals(108, p.getPeriod());
-
- int lastPeriod = p.getPeriod();
- for(int i=0; i<(periodsPerHour - 108); i++) {
- p = p.next();
-
- // validate the next period
- assertEquals(2016, p.getYear());
- assertEquals(238, p.getDayOfYear());
- assertEquals(periodsPerHour, p.getPeriodsPerHour());
-
- int nextPeriod = lastPeriod + 1;
- boolean rolloverToNextHour = nextPeriod >= periodsPerHour;
- if(!rolloverToNextHour) {
- // still within the same hour
- assertEquals(13, p.getHour());
- assertEquals(nextPeriod, p.getPeriod());
-
- } else {
- // rollover to next hour
- assertEquals(14, p.getHour());
- assertEquals(0, p.getPeriod());
- break;
- }
-
- lastPeriod = p.getPeriod();
- }
- }
-
- /**
- * With 2 periods per hour, 'Thu, Aug 25 2016 13:27:10 GMT' falls within the 1st period.
- * Period starts at 'Thu, Aug 25 2016 13:00:00 000 GMT' ~ 1472130000000L
- */
- @Test
- public void testTimeInMillisWith2PeriodsPerHour() {
- final ProfilePeriod period = new ProfilePeriod(AUG2016, 2);
- assertEquals(1472130000000L, period.getTimeInMillis());
- }
-
- /**
- * With 4 periods per hour, 'Thu, Aug 25 2016 13:27:10 GMT' falls within the 2nd period.
- * Period starts at 'Thu, Aug 25 2016 13:15:00 000 GMT' ~ 1472130900000L
- */
- @Test
- public void testTimeInMillisWith4PeriodsPerHour() {
- final ProfilePeriod period = new ProfilePeriod(AUG2016, 4);
- assertEquals(1472130900000L, period.getTimeInMillis());
- }
-
- /**
- * With 60 periods per hour, 'Thu, Aug 25 2016 13:27:10 GMT' falls within the 27th period.
- * Period starts at 'Thu, Aug 25 2016 13:27:00 000 GMT' ~ 1472131620000L
- */
- @Test
- public void testTimeInMillisWith60PeriodsPerHour() {
- final ProfilePeriod period = new ProfilePeriod(AUG2016, 60);
- assertEquals(1472131620000L, period.getTimeInMillis());
- }
-
- /**
- * With 240 periods per hour, 'Thu, Aug 25 2016 13:27:10 GMT' falls within the 108th period.
- * Period starts at 'Thu, Aug 25 2016 13:27:00 000 GMT' ~ 1472131620000L
- */
- @Test
- public void testTimeInMillisWith240PeriodsPerHour() {
- final ProfilePeriod period = new ProfilePeriod(AUG2016, 240);
- assertEquals(1472131620000L, period.getTimeInMillis());
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index df8b335..415d89f 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -37,6 +37,7 @@ import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
@@ -164,7 +165,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
bolt.setCuratorFramework(client);
bolt.setTreeCache(cache);
bolt.setExecutor(new DefaultStellarExecutor());
- bolt.setPeriodsPerHour(4);
+ bolt.setPeriodDurationMillis(TimeUnit.MINUTES.toMillis(15));
bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
return bolt;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
index 1fb25a7..4834072 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
@@ -31,6 +31,7 @@ import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@@ -59,7 +60,7 @@ public class ProfileHBaseMapperTest {
mapper.setExecutor(executor);
mapper.setRowKeyBuilder(rowKeyBuilder);
- measurement = new ProfileMeasurement("profile", "entity", 20000, 4);
+ measurement = new ProfileMeasurement("profile", "entity", 20000, 15, TimeUnit.MINUTES);
measurement.setValue(22);
// the tuple will contain the original message
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
index dce7757..b4ebcf2 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
@@ -47,17 +47,15 @@ import static org.mockito.Mockito.when;
public class SaltyRowKeyBuilderTest {
private static final int saltDivisor = 1000;
- private static final int periodsPerHour = 4;
+ private static final long periodDuration = 15;
+ private static final TimeUnit periodUnits = TimeUnit.MINUTES;
private SaltyRowKeyBuilder rowKeyBuilder;
private ProfileMeasurement measurement;
private Tuple tuple;
/**
- * Thu, Aug 25 2016 09:27:10 EST
* Thu, Aug 25 2016 13:27:10 GMT
- *
- * 238th day of the year
*/
private long AUG2016 = 1472131630748L;
@@ -65,7 +63,7 @@ public class SaltyRowKeyBuilderTest {
public void setup() throws Exception {
// a profile measurement
- measurement = new ProfileMeasurement("profile", "entity", AUG2016, periodsPerHour);
+ measurement = new ProfileMeasurement("profile", "entity", AUG2016, periodDuration, periodUnits);
measurement.setValue(22);
// the tuple will contain the original message
@@ -79,7 +77,7 @@ public class SaltyRowKeyBuilderTest {
@Test
public void testRowKeyWithOneGroup() throws Exception {
// setup
- rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodsPerHour);
+ rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodUnits);
List<Object> groups = Arrays.asList("group1");
// the expected row key
@@ -89,10 +87,7 @@ public class SaltyRowKeyBuilderTest {
.put(measurement.getProfileName().getBytes())
.put(measurement.getEntity().getBytes())
.put("group1".getBytes())
- .putInt(2016)
- .putInt(238)
- .putInt(13)
- .putInt(1);
+ .putLong(1635701L);
buffer.flip();
final byte[] expected = new byte[buffer.limit()];
@@ -109,7 +104,7 @@ public class SaltyRowKeyBuilderTest {
@Test
public void testRowKeyWithTwoGroups() throws Exception {
// setup
- rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodsPerHour);
+ rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodUnits);
List<Object> groups = Arrays.asList("group1","group2");
// the expected row key
@@ -120,10 +115,7 @@ public class SaltyRowKeyBuilderTest {
.put(measurement.getEntity().getBytes())
.put("group1".getBytes())
.put("group2".getBytes())
- .putInt(2016)
- .putInt(238)
- .putInt(13)
- .putInt(1);
+ .putLong(1635701L);
buffer.flip();
final byte[] expected = new byte[buffer.limit()];
@@ -140,7 +132,7 @@ public class SaltyRowKeyBuilderTest {
@Test
public void testRowKeyWithOneIntegerGroup() throws Exception {
// setup
- rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodsPerHour);
+ rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodUnits);
List<Object> groups = Arrays.asList(200);
// the expected row key
@@ -150,10 +142,7 @@ public class SaltyRowKeyBuilderTest {
.put(measurement.getProfileName().getBytes())
.put(measurement.getEntity().getBytes())
.put("200".getBytes())
- .putInt(2016)
- .putInt(238)
- .putInt(13)
- .putInt(1);
+ .putLong(1635701L);
buffer.flip();
final byte[] expected = new byte[buffer.limit()];
@@ -170,7 +159,7 @@ public class SaltyRowKeyBuilderTest {
@Test
public void testRowKeyWithMixedGroups() throws Exception {
// setup
- rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodsPerHour);
+ rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodUnits);
List<Object> groups = Arrays.asList(200, "group1");
// the expected row key
@@ -181,10 +170,7 @@ public class SaltyRowKeyBuilderTest {
.put(measurement.getEntity().getBytes())
.put("200".getBytes())
.put("group1".getBytes())
- .putInt(2016)
- .putInt(238)
- .putInt(13)
- .putInt(1);
+ .putLong(1635701L);
buffer.flip();
final byte[] expected = new byte[buffer.limit()];
@@ -201,7 +187,7 @@ public class SaltyRowKeyBuilderTest {
@Test
public void testRowKeyWithNoGroup() throws Exception {
// setup
- rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodsPerHour);
+ rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodUnits);
List<Object> groups = Collections.emptyList();
// the expected row key
@@ -210,10 +196,7 @@ public class SaltyRowKeyBuilderTest {
.put(SaltyRowKeyBuilder.getSalt(measurement.getPeriod(), saltDivisor))
.put(measurement.getProfileName().getBytes())
.put(measurement.getEntity().getBytes())
- .putInt(2016)
- .putInt(238)
- .putInt(13)
- .putInt(1);
+ .putLong(1635701L);
buffer.flip();
final byte[] expected = new byte[buffer.limit()];
@@ -233,16 +216,16 @@ public class SaltyRowKeyBuilderTest {
// setup
List<Object> groups = Collections.emptyList();
- rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodsPerHour);
+ rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodUnits);
// a dummy profile measurement
long oldest = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hoursAgo);
- ProfileMeasurement m = new ProfileMeasurement("profile", "entity", oldest, periodsPerHour);
+ ProfileMeasurement m = new ProfileMeasurement("profile", "entity", oldest, periodDuration, periodUnits);
m.setValue(22);
// generate a list of expected keys
List<byte[]> expectedKeys = new ArrayList<>();
- for (int i=0; i<(hoursAgo * periodsPerHour)+1; i++) {
+ for (int i=0; i<(hoursAgo * 4)+1; i++) {
// generate the expected key
byte[] rk = rowKeyBuilder.rowKey(m, groups);
@@ -250,7 +233,7 @@ public class SaltyRowKeyBuilderTest {
// advance to the next period
ProfilePeriod next = m.getPeriod().next();
- m = new ProfileMeasurement("profile", "entity", next.getTimeInMillis(), periodsPerHour);
+ m = new ProfileMeasurement("profile", "entity", next.getStartTimeMillis(), periodDuration, periodUnits);
}
// execute
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/baf0d24a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index 03e25a4..91191c8 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -265,7 +265,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
setProperty("profiler.workers", "1");
setProperty("profiler.executors", "0");
setProperty("profiler.input.topic", Constants.INDEXING_TOPIC);
- setProperty("profiler.periods.per.hour", "240");
+ setProperty("profiler.period.duration", "5");
+ setProperty("profiler.period.duration.units", "SECONDS");
setProperty("profiler.hbase.salt.divisor", "10");
setProperty("profiler.hbase.table", tableName);
setProperty("profiler.hbase.column.family", columnFamily);