You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hivemall.apache.org by my...@apache.org on 2016/12/02 08:02:11 UTC
[21/50] [abbrv] incubator-hivemall git commit: Implement initial
SST-based change-point detector
Implement initial SST-based change-point detector
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/3ebd771e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/3ebd771e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/3ebd771e
Branch: refs/heads/JIRA-22/pr-356
Commit: 3ebd771ee4bebf14769b7c240f8b28b9d5d10e86
Parents: 89ec56e
Author: Takuya Kitazawa <k....@gmail.com>
Authored: Mon Sep 26 17:12:01 2016 +0900
Committer: Takuya Kitazawa <k....@gmail.com>
Committed: Mon Sep 26 17:12:01 2016 +0900
----------------------------------------------------------------------
.../java/hivemall/anomaly/SSTChangePoint.java | 118 +++++++++++
.../hivemall/anomaly/SSTChangePointUDF.java | 197 +++++++++++++++++++
.../hivemall/anomaly/SSTChangePointTest.java | 111 +++++++++++
3 files changed, 426 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/3ebd771e/core/src/main/java/hivemall/anomaly/SSTChangePoint.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/anomaly/SSTChangePoint.java b/core/src/main/java/hivemall/anomaly/SSTChangePoint.java
new file mode 100644
index 0000000..e693bd4
--- /dev/null
+++ b/core/src/main/java/hivemall/anomaly/SSTChangePoint.java
@@ -0,0 +1,118 @@
+/*
+ * Hivemall: Hive scalable Machine Learning Library
+ *
+ * Copyright (C) 2015 Makoto YUI
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package hivemall.anomaly;
+
+import hivemall.anomaly.SSTChangePointUDF.SSTChangePointInterface;
+import hivemall.anomaly.SSTChangePointUDF.Parameters;
+import hivemall.utils.collections.DoubleRingBuffer;
+import org.apache.commons.math3.linear.MatrixUtils;
+import org.apache.commons.math3.linear.RealMatrix;
+import org.apache.commons.math3.linear.SingularValueDecomposition;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+import java.util.Arrays;
+
+import javax.annotation.Nonnull;
+
+final class SSTChangePoint implements SSTChangePointInterface {
+
+ @Nonnull
+ private final PrimitiveObjectInspector oi;
+
+ @Nonnull
+ private final int window;
+ @Nonnull
+ private final int nPastWindow;
+ @Nonnull
+ private final int nCurrentWindow;
+ @Nonnull
+ private final int pastSize;
+ @Nonnull
+ private final int currentSize;
+ @Nonnull
+ private final int currentOffset;
+ @Nonnull
+ private final int r;
+
+ @Nonnull
+ private final DoubleRingBuffer xRing;
+ @Nonnull
+ private final double[] xSeries;
+
+ SSTChangePoint(@Nonnull Parameters params, @Nonnull PrimitiveObjectInspector oi) {
+ this.oi = oi;
+
+ this.window = params.w;
+ this.nPastWindow = params.n;
+ this.nCurrentWindow = params.m;
+ this.pastSize = window + nPastWindow;
+ this.currentSize = window + nCurrentWindow;
+ this.currentOffset = params.g;
+ this.r = params.r;
+
+ // (w + n) past samples for the n-past-windows
+ // (w + m) current samples for the m-current-windows, starting from offset g
+ // => need to hold past (w + n + g + w + m) samples from the latest sample
+ int holdSampleSize = pastSize + currentOffset + currentSize;
+
+ this.xRing = new DoubleRingBuffer(holdSampleSize);
+ this.xSeries = new double[holdSampleSize];
+ }
+
+ @Override
+ public void update(@Nonnull final Object arg, @Nonnull final double[] outScores)
+ throws HiveException {
+ double x = PrimitiveObjectInspectorUtils.getDouble(arg, oi);
+ xRing.add(x).toArray(xSeries, true /* FIFO */);
+
+ // need to wait until the buffer is filled
+ if (!xRing.isFull()) {
+ outScores[0] = 0.d;
+ } else {
+ outScores[0] = computeScore();
+ }
+ }
+
+ private double computeScore() {
+ // create past trajectory matrix and find its left singular vectors
+ RealMatrix H = MatrixUtils.createRealMatrix(window, nPastWindow);
+ for (int i = 0; i < nPastWindow; i++) {
+ H.setColumn(i, Arrays.copyOfRange(xSeries, i, i + window));
+ }
+ SingularValueDecomposition svdH = new SingularValueDecomposition(H);
+ RealMatrix UT = svdH.getUT();
+
+ // create current trajectory matrix and find its left singular vectors
+ RealMatrix G = MatrixUtils.createRealMatrix(window, nCurrentWindow);
+ int currentHead = pastSize + currentOffset;
+ for (int i = 0; i < nCurrentWindow; i++) {
+ G.setColumn(i, Arrays.copyOfRange(xSeries, currentHead + i, currentHead + i + window));
+ }
+ SingularValueDecomposition svdG = new SingularValueDecomposition(G);
+ RealMatrix Q = svdG.getU();
+
+ // find the largest singular value for the r principal components
+ RealMatrix UTQ = UT.getSubMatrix(0, r - 1, 0, window - 1).multiply(Q.getSubMatrix(0, window - 1, 0, r - 1));
+ SingularValueDecomposition svdUTQ = new SingularValueDecomposition(UTQ);
+ double[] s = svdUTQ.getSingularValues();
+
+ return 1.d - s[0];
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/3ebd771e/core/src/main/java/hivemall/anomaly/SSTChangePointUDF.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/anomaly/SSTChangePointUDF.java b/core/src/main/java/hivemall/anomaly/SSTChangePointUDF.java
new file mode 100644
index 0000000..3ab5ae8
--- /dev/null
+++ b/core/src/main/java/hivemall/anomaly/SSTChangePointUDF.java
@@ -0,0 +1,197 @@
+/*
+ * Hivemall: Hive scalable Machine Learning Library
+ *
+ * Copyright (C) 2015 Makoto YUI
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package hivemall.anomaly;
+
+import hivemall.UDFWithOptions;
+import hivemall.utils.collections.DoubleRingBuffer;
+import hivemall.utils.hadoop.HiveUtils;
+import hivemall.utils.lang.Preconditions;
+import hivemall.utils.lang.Primitives;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BooleanWritable;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+@Description(
+ name = "sst_changepoint",
+ value = "_FUNC_(double|array<double> x [, const string options])"
+ + " - Returns change-point scores and decisions using Singular Spectrum Transformation (SST)."
+ + " It will return a tuple <double changepoint_score [, boolean is_changepoint]>")
+public final class SSTChangePointUDF extends UDFWithOptions {
+
+ private transient Parameters _params;
+ private transient SSTChangePoint _sst;
+
+ private transient double[] _scores;
+ private transient Object[] _result;
+ private transient DoubleWritable _changepointScore;
+ @Nullable
+ private transient BooleanWritable _isChangepoint = null;
+
+ public SSTChangePointUDF() {}
+
+ // Visible for testing
+ Parameters getParameters() {
+ return _params;
+ }
+
+ @Override
+ protected Options getOptions() {
+ Options opts = new Options();
+ opts.addOption("w", "window", true, "Number of samples which affects change-point score [default: 30]");
+ opts.addOption("n", "n_past", true,
+ "Number of past windows for change-point scoring [default: equal to `w` = 30]");
+ opts.addOption("m", "n_current", true,
+ "Number of current windows for change-point scoring [default: equal to `w` = 30]");
+ opts.addOption("g", "current_offset", true,
+ "Offset of the current windows from the updating sample [default: `-w` = -30]");
+ opts.addOption("r", "n_component", true,
+ "Number of singular vectors (i.e. principal components) [default: 3]");
+ opts.addOption("k", "n_dim", true,
+ "Number of dimensions for the Krylov subspaces [default: 5 (`2*r` if `r` is even, `2*r-1` otherwise)]");
+ opts.addOption("th", "threshold", true,
+ "Score threshold (inclusive) for determining change-point existence [default: -1, do not output decision]");
+ return opts;
+ }
+
+ @Override
+ protected CommandLine processOptions(String optionValues) throws UDFArgumentException {
+ CommandLine cl = parseOptions(optionValues);
+
+ this._params.w = Primitives.parseInt(cl.getOptionValue("w"), _params.w);
+ this._params.n = Primitives.parseInt(cl.getOptionValue("n"), _params.w);
+ this._params.m = Primitives.parseInt(cl.getOptionValue("m"), _params.w);
+ this._params.g = Primitives.parseInt(cl.getOptionValue("g"), -1 * _params.w);
+ this._params.r = Primitives.parseInt(cl.getOptionValue("r"), _params.r);
+ this._params.k = Primitives.parseInt(
+ cl.getOptionValue("k"), (_params.r % 2 == 0) ? (2 * _params.r) : (2 * _params.r - 1));
+ this._params.changepointThreshold = Primitives.parseDouble(
+ cl.getOptionValue("th"), _params.changepointThreshold);
+
+ Preconditions.checkArgument(_params.w >= 2, "w must be greather than 1: " + _params.w);
+ Preconditions.checkArgument(_params.r >= 1, "r must be greater than 0: " + _params.r);
+ Preconditions.checkArgument(_params.k >= 1, "k must be greater than 0: " + _params.k);
+ Preconditions.checkArgument(_params.changepointThreshold > 0.d && _params.changepointThreshold < 1.d,
+ "changepointThreshold must be in range (0, 1): " + _params.changepointThreshold);
+
+ return cl;
+ }
+
+ @Override
+ public ObjectInspector initialize(@Nonnull ObjectInspector[] argOIs)
+ throws UDFArgumentException {
+ if (argOIs.length < 1 || argOIs.length > 2) {
+ throw new UDFArgumentException(
+ "_FUNC_(double|array<double> x [, const string options]) takes 1 or 2 arguments: "
+ + Arrays.toString(argOIs));
+ }
+
+ this._params = new Parameters();
+ if (argOIs.length == 2) {
+ String options = HiveUtils.getConstString(argOIs[1]);
+ processOptions(options);
+ }
+
+ ObjectInspector argOI0 = argOIs[0];
+ PrimitiveObjectInspector xOI = HiveUtils.asDoubleCompatibleOI(argOI0);
+ this._sst = new SSTChangePoint(_params, xOI);
+
+ this._scores = new double[1];
+
+ final Object[] result;
+ final ArrayList<String> fieldNames = new ArrayList<String>();
+ final ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+ fieldNames.add("changepoint_score");
+ fieldOIs.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ if (_params.changepointThreshold != -1d) {
+ fieldNames.add("is_changepoint");
+ fieldOIs.add(PrimitiveObjectInspectorFactory.writableBooleanObjectInspector);
+ result = new Object[2];
+ this._isChangepoint = new BooleanWritable(false);
+ result[1] = _isChangepoint;
+ } else {
+ result = new Object[1];
+ }
+ this._changepointScore = new DoubleWritable(0.d);
+ result[0] = _changepointScore;
+ this._result = result;
+
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
+ }
+
+ @Override
+ public Object[] evaluate(@Nonnull DeferredObject[] args) throws HiveException {
+ Object x = args[0].get();
+ if (x == null) {
+ return _result;
+ }
+
+ _sst.update(x, _scores);
+
+ double changepointScore = _scores[0];
+ _changepointScore.set(changepointScore);
+ if (_isChangepoint != null) {
+ _isChangepoint.set(changepointScore >= _params.changepointThreshold);
+ }
+
+ return _result;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this._result = null;
+ this._changepointScore = null;
+ this._isChangepoint = null;
+ }
+
+ @Override
+ public String getDisplayString(String[] children) {
+ return "sst(" + Arrays.toString(children) + ")";
+ }
+
+ static final class Parameters {
+ int w = 30;
+ int n = 30;
+ int m = 30;
+ int g = -30;
+ int r = 3;
+ int k = 5;
+ double changepointThreshold = -1.d;
+
+ Parameters() {}
+ }
+
+ public interface SSTChangePointInterface {
+ void update(@Nonnull Object arg, @Nonnull double[] outScores) throws HiveException;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/3ebd771e/core/src/test/java/hivemall/anomaly/SSTChangePointTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/anomaly/SSTChangePointTest.java b/core/src/test/java/hivemall/anomaly/SSTChangePointTest.java
new file mode 100644
index 0000000..b41d474
--- /dev/null
+++ b/core/src/test/java/hivemall/anomaly/SSTChangePointTest.java
@@ -0,0 +1,111 @@
+/*
+ * Hivemall: Hive scalable Machine Learning Library
+ *
+ * Copyright (C) 2015 Makoto YUI
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package hivemall.anomaly;
+
+import hivemall.anomaly.SSTChangePointUDF.Parameters;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.zip.GZIPInputStream;
+
+import javax.annotation.Nonnull;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SSTChangePointTest {
+ private static final boolean DEBUG = false;
+
+ @Test
+ public void testSST() throws IOException, HiveException {
+ Parameters params = new Parameters();
+ PrimitiveObjectInspector oi = PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
+ SSTChangePoint sst = new SSTChangePoint(params, oi);
+ double[] outScores = new double[1];
+
+ BufferedReader reader = readFile("cf1d.csv");
+ println("x change");
+ String line;
+ int numChangepoints = 0;
+ while ((line = reader.readLine()) != null) {
+ double x = Double.parseDouble(line);
+ sst.update(x, outScores);
+ printf("%f %f%n", x, outScores[0]);
+ if (outScores[0] > 0.95d) {
+ numChangepoints++;
+ }
+ }
+ Assert.assertTrue("#changepoints SHOULD be greater than 0: " + numChangepoints,
+ numChangepoints > 0);
+ Assert.assertTrue("#changepoints SHOULD be less than 5: " + numChangepoints,
+ numChangepoints < 5);
+ }
+
+ @Test
+ public void testTwitterData() throws IOException, HiveException {
+ Parameters params = new Parameters();
+ PrimitiveObjectInspector oi = PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
+ SSTChangePoint sst = new SSTChangePoint(params, oi);
+ double[] outScores = new double[1];
+
+ BufferedReader reader = readFile("twitter.csv.gz");
+ println("# time x change");
+ String line;
+ int i = 1, numChangepoints = 0;
+ while ((line = reader.readLine()) != null) {
+ double x = Double.parseDouble(line);
+ sst.update(x, outScores);
+ printf("%d %f %f%n", i, x, outScores[0]);
+ if (outScores[0] > 0.005d) {
+ numChangepoints++;
+ }
+ i++;
+ }
+ Assert.assertTrue("#changepoints SHOULD be greater than 0: " + numChangepoints,
+ numChangepoints > 0);
+ Assert.assertTrue("#changepoints SHOULD be less than 5: " + numChangepoints,
+ numChangepoints < 5);
+ }
+
+ private static void println(String msg) {
+ if (DEBUG) {
+ System.out.println(msg);
+ }
+ }
+
+ private static void printf(String format, Object... args) {
+ if (DEBUG) {
+ System.out.printf(format, args);
+ }
+ }
+
+ @Nonnull
+ private static BufferedReader readFile(@Nonnull String fileName) throws IOException {
+ InputStream is = SSTChangePointTest.class.getResourceAsStream(fileName);
+ if (fileName.endsWith(".gz")) {
+ is = new GZIPInputStream(is);
+ }
+ return new BufferedReader(new InputStreamReader(is));
+ }
+
+}