You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by to...@apache.org on 2012/10/05 22:19:44 UTC

svn commit: r1394783 - in /hama/trunk/ml/src/main/java/org/apache/hama/ml/regression: ./ GradientDescentBSP.java

Author: tommaso
Date: Fri Oct  5 20:19:43 2012
New Revision: 1394783

URL: http://svn.apache.org/viewvc?rev=1394783&view=rev
Log:
[HAMA-651] - adding first basic unregularized version of a gradient descent BSP

Added:
    hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/
    hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java   (with props)

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java?rev=1394783&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java Fri Oct  5 20:19:43 2012
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.regression;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.ml.math.DenseDoubleVector;
+import org.apache.hama.ml.math.DoubleVector;
+import org.apache.hama.ml.writable.VectorWritable;
+import org.apache.hama.util.KeyValuePair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A gradient descent (see <code>http://en.wikipedia.org/wiki/Gradient_descent</code>) BSP based abstract implementation.
+ * Each extending class should implement the #hypothesis(DoubleVector theta, DoubleVector x) method for a specific
+ */
+public abstract class GradientDescentBSP extends BSP<VectorWritable, DoubleWritable, NullWritable, NullWritable, VectorWritable> {
+
+    private static final Logger log = LoggerFactory.getLogger(GradientDescentBSP.class);
+    static final String INITIAL_THETA_VALUES = "initial.theta.values";
+    static final String ALPHA = "alpha";
+
+    private boolean master;
+    private DoubleVector theta;
+
+    @Override
+    public void setup(BSPPeer<VectorWritable, DoubleWritable, NullWritable, NullWritable, VectorWritable> peer) throws IOException, SyncException, InterruptedException {
+        master = peer.getPeerIndex() == peer.getNumPeers() / 2;
+    }
+
+    @Override
+    public void bsp(BSPPeer<VectorWritable, DoubleWritable, NullWritable, NullWritable, VectorWritable> peer) throws IOException, SyncException, InterruptedException {
+
+        while (true) {
+
+            getTheta(peer);
+
+            // first superstep : calculate cost function in parallel
+
+            double localCost = 0d;
+
+            int numRead = 0;
+
+            // read an input
+            KeyValuePair<VectorWritable, DoubleWritable> kvp;
+            while ((kvp = peer.readNext()) != null) {
+
+                // calculate cost for given input
+                double y = kvp.getValue().get();
+                DoubleVector x = kvp.getKey().getVector();
+                double costForX = y * Math.log(hypothesis(theta, x)) + (1 - y) * Math.log(1 - hypothesis(theta, x));
+
+                // adds to local cost
+                localCost += costForX;
+                numRead++;
+            }
+
+            // cost is sent and aggregated by each
+            double totalCost = localCost;
+
+            for (String peerName : peer.getAllPeerNames()) {
+                peer.send(peerName, new VectorWritable(new DenseDoubleVector(new double[]{localCost, numRead})));
+            }
+            peer.sync();
+
+            // second superstep : cost calculation
+
+            VectorWritable costResult;
+            while ((costResult = peer.getCurrentMessage()) != null) {
+                totalCost += costResult.getVector().get(0);
+                numRead += costResult.getVector().get(1);
+            }
+
+            totalCost = totalCost * (-1 / numRead);
+            if (log.isInfoEnabled()) {
+                log.info("cost is " + totalCost);
+            }
+
+            peer.sync();
+
+            peer.reopenInput();
+
+            double[] thetaDelta = new double[theta.getLength()];
+
+            // second superstep : calculate partial derivatives in parallel
+            while ((kvp = peer.readNext()) != null) {
+                DoubleVector x = kvp.getKey().getVector();
+                double y = kvp.getValue().get();
+                double difference = hypothesis(theta, x) - y;
+                for (int j = 0; j < theta.getLength(); j++) {
+                    thetaDelta[j] += difference * x.get(j);
+                }
+            }
+
+            // send thetaDelta to the each peer
+            for (String peerName : peer.getAllPeerNames()) {
+                peer.send(peerName, new VectorWritable(new DenseDoubleVector(thetaDelta)));
+            }
+
+            peer.sync();
+
+            VectorWritable thetaDeltaSlice;
+            while ((thetaDeltaSlice = peer.getCurrentMessage()) != null) {
+                double[] newTheta = new double[theta.getLength()];
+
+                for (int j = 0; j < theta.getLength(); j++) {
+                    newTheta[j] += thetaDeltaSlice.getVector().get(j);
+                }
+
+                for (int j = 0; j < theta.getLength(); j++) {
+                    newTheta[j] = theta.get(j) - newTheta[j] * peer.getConfiguration().getFloat(ALPHA, 0.3f);
+                }
+
+                theta = new DenseDoubleVector(newTheta);
+
+                if (log.isInfoEnabled()) {
+                    log.info("new theta for cost " + totalCost + " is " + theta.toArray().toString());
+                }
+            }
+            peer.sync();
+
+            // eventually break execution !?
+            if (totalCost == 0) {
+                // TODO change this as just 0 is too strict
+                break;
+            }
+        }
+
+    }
+
+    /**
+     * Applies the hypothesis given a set of parameters theta to a given input x
+     *
+     * @param theta the parameters vector
+     * @param x     the input
+     * @return a <code>double</code> number
+     */
+    public abstract double hypothesis(DoubleVector theta, DoubleVector x);
+
+
+    public void getTheta(BSPPeer<VectorWritable, DoubleWritable, NullWritable, NullWritable, VectorWritable> peer) throws IOException, SyncException, InterruptedException {
+        if (master && theta == null) {
+            int size = getXSize(peer);
+            theta = new DenseDoubleVector(size, peer.getConfiguration().getInt(INITIAL_THETA_VALUES, 10));
+            for (String peerName : peer.getAllPeerNames()) {
+                peer.send(peerName, new VectorWritable(theta));
+            }
+            peer.sync();
+        } else {
+            peer.sync();
+            VectorWritable vectorWritable = peer.getCurrentMessage();
+            theta = vectorWritable.getVector();
+        }
+    }
+
+    private int getXSize(BSPPeer<VectorWritable, DoubleWritable, NullWritable, NullWritable, VectorWritable> peer) throws IOException {
+        VectorWritable key = null;
+        peer.readNext(key, null);
+        if (key == null) {
+            throw new IOException("cannot read input vector size");
+        }
+        return key.getVector().getLength();
+    }
+}

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/regression/GradientDescentBSP.java
------------------------------------------------------------------------------
    svn:eol-style = native