You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@horn.apache.org by ed...@apache.org on 2016/05/10 04:17:40 UTC

incubator-horn git commit: HORN-14: Add MNIST performance evaluator

Repository: incubator-horn
Updated Branches:
  refs/heads/master 1a3500f7a -> 9f35e9fb2


HORN-14: Add MNIST performance evaluator


Project: http://git-wip-us.apache.org/repos/asf/incubator-horn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-horn/commit/9f35e9fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-horn/tree/9f35e9fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-horn/diff/9f35e9fb

Branch: refs/heads/master
Commit: 9f35e9fb2171d6b08345f7863fc0dd2047759cfb
Parents: 1a3500f
Author: Edward J. Yoon <ed...@apache.org>
Authored: Thu Apr 28 16:53:26 2016 +0900
Committer: Edward J. Yoon <ed...@apache.org>
Committed: Tue May 10 11:14:45 2016 +0900

----------------------------------------------------------------------
 README.md                                       |  29 ++-
 conf/horn-env.sh                                |   2 +-
 .../horn/core/AbstractLayeredNeuralNetwork.java |  53 ++++-
 .../apache/horn/core/AbstractNeuralNetwork.java |  68 ++++--
 .../java/org/apache/horn/core/AutoEncoder.java  |   3 +-
 .../java/org/apache/horn/core/Constants.java    |  31 +++
 src/main/java/org/apache/horn/core/HornJob.java |  26 ++-
 .../apache/horn/core/LayeredNeuralNetwork.java  |  62 ++----
 .../horn/core/LayeredNeuralNetworkTrainer.java  | 218 +++++++++++--------
 .../apache/horn/core/ParameterMergerServer.java |  14 +-
 .../org/apache/horn/core/ParameterMessage.java  |  37 ++--
 .../horn/examples/MultiLayerPerceptron.java     |  36 +--
 .../org/apache/horn/funcs/CrossEntropy.java     |  22 +-
 .../org/apache/horn/utils/MNISTConverter.java   |  14 +-
 .../org/apache/horn/utils/MNISTEvaluator.java   | 111 ++++++++++
 .../java/org/apache/horn/core/TestNeuron.java   |   4 +-
 .../core/TestSmallLayeredNeuralNetwork.java     |   4 +-
 .../horn/examples/MultiLayerPerceptronTest.java |  23 +-
 18 files changed, 514 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index f30ebc6..cb217f6 100644
--- a/README.md
+++ b/README.md
@@ -23,30 +23,29 @@ Then, we measure the margin of error of the output and adjust the weights accord
     public void backward(
         Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
         throws IOException {
+      double gradient = 0;
       for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
         // Calculates error gradient for each neuron
-        double gradient = this.squashingFunction.applyDerivative(this
-            .getOutput()) * (m.getDelta() * m.getWeight());
-        this.backpropagate(gradient);
+        double gradient += (m.getDelta() * m.getWeight());
 
         // Weight corrections
         double weight = -this.getLearningRate() * this.getOutput()
             * m.getDelta() + this.getMomentumWeight() * m.getPrevWeight();
         this.push(weight);
       }
+
+      this.backpropagate(gradient
+          * this.squashingFunction.applyDerivative(this.getOutput()));
     }
   }
 ```
-The advantages of this programming model are:
-
- * Easy and intuitive to use
- * Flexible to make your own CUDA kernels
- * Allows multithreading to be used internally
+The advantages of this programming model is easy and intuitive to use.
 
 Also, Apache Horn provides a simplified and intuitive configuration interface. To create neural network job and submit it to existing Hadoop or Hama cluster, we just add the layer with its properties such as squashing function and neuron class. The below example configures the create 4-layer neural network with 500 neurons in hidden layers for train MNIST dataset:
 ```Java
   HornJob job = new HornJob(conf, MultiLayerPerceptron.class);
   job.setLearningRate(learningRate);
+  job.setTrainingMethod(TrainingMethod.GRADIENT_DESCENT);
   ..
 
   job.inputLayer(784, Sigmoid.class, StandardNeuron.class);
@@ -56,6 +55,20 @@ Also, Apache Horn provides a simplified and intuitive configuration interface. T
   job.setCostFunction(CrossEntropy.class);
 ```
 
+## Quick Run Example
+
+Download a MNIST training and label datasets, and convert into a HDFS sequence file with following command:
+```
+ % bin/horn jar horn-0.x.0.jar MNISTConverter train-images.idx3-ubyte train-labels.idx1-ubyte /tmp/mnist.seq 
+```
+
+Then, train it with following command (in this example, we used \u03b7 0.002, \u03bb 0.1, 100 hidden units, and minibatch 10):
+```
+ % bin/horn jar horn-0.x.0.jar MultiLayerPerceptron /tmp/model /tmp/mnist.seq \
+   0.002 0.0 0.1 784 100 10 10 12000
+ 
+```
+
 ## High Scalability
 
 The Apache Horn is an Sync and Async hybrid distributed training framework. Within single BSP job, each task group works asynchronously using region barrier synchronization instead of global barrier synchronization, and trains large-scale neural network model using assigned data sets in synchronous way.

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/conf/horn-env.sh
----------------------------------------------------------------------
diff --git a/conf/horn-env.sh b/conf/horn-env.sh
index 9f8d9c2..a033fe0 100644
--- a/conf/horn-env.sh
+++ b/conf/horn-env.sh
@@ -22,5 +22,5 @@
 # Set environment variables here.
 
 # The java implementation to use.  Required.
-export JAVA_HOME=/usr/lib/jvm/java-8-oracle
+export JAVA_HOME=/usr/lib/jvm/java-8-oracle/
 

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
index b0162ab..e415a25 100644
--- a/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
+++ b/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
@@ -28,6 +28,8 @@ import org.apache.hama.commons.math.DoubleDoubleFunction;
 import org.apache.hama.commons.math.DoubleFunction;
 import org.apache.hama.commons.math.DoubleMatrix;
 import org.apache.hama.commons.math.DoubleVector;
+import org.apache.horn.core.Constants.LearningStyle;
+import org.apache.horn.core.Constants.TrainingMethod;
 import org.apache.horn.funcs.FunctionFactory;
 
 import com.google.common.base.Preconditions;
@@ -45,10 +47,14 @@ import com.google.common.collect.Lists;
  */
 abstract class AbstractLayeredNeuralNetwork extends AbstractNeuralNetwork {
 
+  private static final double DEFAULT_REGULARIZATION_WEIGHT = 0;
   private static final double DEFAULT_MOMENTUM_WEIGHT = 0.1;
 
   double trainingError;
 
+  /* The weight of regularization */
+  protected double regularizationWeight;
+
   /* The momentumWeight */
   protected double momentumWeight;
 
@@ -62,16 +68,8 @@ abstract class AbstractLayeredNeuralNetwork extends AbstractNeuralNetwork {
   
   protected LearningStyle learningStyle;
 
-  public static enum TrainingMethod {
-    GRADIENT_DESCENT
-  }
-  
-  public static enum LearningStyle {
-    UNSUPERVISED,
-    SUPERVISED
-  }
-  
   public AbstractLayeredNeuralNetwork() {
+    this.regularizationWeight = DEFAULT_REGULARIZATION_WEIGHT;
     this.momentumWeight = DEFAULT_MOMENTUM_WEIGHT;
     this.trainingMethod = TrainingMethod.GRADIENT_DESCENT;
     this.learningStyle = LearningStyle.SUPERVISED;
@@ -81,6 +79,38 @@ abstract class AbstractLayeredNeuralNetwork extends AbstractNeuralNetwork {
     super(conf, modelPath);
   }
 
+  /**
+   * Set the regularization weight. Recommend in the range [0, 0.1), More
+   * complex the model is, less weight the regularization is.
+   * 
+   * @param regularizationWeight
+   */
+  public void setRegularizationWeight(double regularizationWeight) {
+    Preconditions.checkArgument(regularizationWeight >= 0
+        && regularizationWeight < 1.0,
+        "Regularization weight must be in range [0, 1.0)");
+    this.regularizationWeight = regularizationWeight;
+  }
+
+  public double getRegularizationWeight() {
+    return this.regularizationWeight;
+  }
+
+  /**
+   * Set the momemtum weight for the model. Recommend in range [0, 0.5].
+   * 
+   * @param momentumWeight
+   */
+  public void setMomemtumWeight(double momentumWeight) {
+    Preconditions.checkArgument(momentumWeight >= 0 && momentumWeight <= 1.0,
+        "Momentum weight must be in range [0, 1.0]");
+    this.momentumWeight = momentumWeight;
+  }
+
+  public double getMomemtumWeight() {
+    return this.momentumWeight;
+  }
+
   public void setTrainingMethod(TrainingMethod method) {
     this.trainingMethod = method;
   }
@@ -117,7 +147,6 @@ abstract class AbstractLayeredNeuralNetwork extends AbstractNeuralNetwork {
    *          is f(x) = x by default.
    * @return The layer index, starts with 0.
    */
-  @SuppressWarnings("rawtypes")
   public abstract int addLayer(int size, boolean isFinalLayer,
       DoubleFunction squashingFunction, Class<? extends Neuron> neuronClass);
 
@@ -182,6 +211,8 @@ abstract class AbstractLayeredNeuralNetwork extends AbstractNeuralNetwork {
   @Override
   public void readFields(DataInput input) throws IOException {
     super.readFields(input);
+    // read regularization weight
+    this.regularizationWeight = input.readDouble();
     // read momentum weight
     this.momentumWeight = input.readDouble();
 
@@ -203,6 +234,8 @@ abstract class AbstractLayeredNeuralNetwork extends AbstractNeuralNetwork {
   @Override
   public void write(DataOutput output) throws IOException {
     super.write(output);
+    // write regularization weight
+    output.writeDouble(this.regularizationWeight);
     // write momentum weight
     output.writeDouble(this.momentumWeight);
 

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java b/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
index 45f56a3..5624e49 100644
--- a/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
+++ b/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
@@ -22,6 +22,9 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
 
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -47,9 +50,10 @@ import com.google.common.io.Closeables;
  * 
  */
 public abstract class AbstractNeuralNetwork implements Writable {
+
   protected HamaConfiguration conf;
   protected FileSystem fs;
-
+  
   private static final double DEFAULT_LEARNING_RATE = 0.5;
 
   protected double learningRate;
@@ -68,21 +72,31 @@ public abstract class AbstractNeuralNetwork implements Writable {
     this.featureTransformer = new DefaultFeatureTransformer();
   }
 
-  public AbstractNeuralNetwork(String modelPath) {
-    this.modelPath = modelPath;
-  }
-
   public AbstractNeuralNetwork(HamaConfiguration conf, String modelPath) {
     try {
       this.conf = conf;
-      this.fs = FileSystem.get(conf);
       this.modelPath = modelPath;
-
       this.readFromModel();
     } catch (IOException e) {
       e.printStackTrace();
     }
+  }
+
+  /**
+   * Set the degree of aggression during model training, a large learning rate
+   * can increase the training speed, but it also decrease the chance of model
+   * converge. Recommend in range (0, 0.3).
+   * 
+   * @param learningRate
+   */
+  public void setLearningRate(double learningRate) {
+    Preconditions.checkArgument(learningRate > 0,
+        "Learning rate must be larger than 0.");
+    this.learningRate = learningRate;
+  }
 
+  public double getLearningRate() {
+    return this.learningRate;
   }
 
   public void isLearningRateDecay(boolean decay) {
@@ -102,19 +116,21 @@ public abstract class AbstractNeuralNetwork implements Writable {
    * @throws ClassNotFoundException 
    * @throws IOException
    */
-  public BSPJob train(Configuration conf) throws ClassNotFoundException, IOException, InterruptedException {
+  public BSPJob train(HamaConfiguration conf) throws ClassNotFoundException, IOException, InterruptedException {
     Preconditions.checkArgument(this.modelPath != null,
         "Please set the model path before training.");
-
     // train with BSP job
-    return trainInternal((HamaConfiguration) conf);
+      return trainInternal(conf);
   }
 
   /**
    * Train the model with the path of given training data and parameters.
+   * 
+   * @param dataInputPath
+   * @param trainingParams
    */
-  protected abstract BSPJob trainInternal(HamaConfiguration hamaConf)
-      throws IOException, InterruptedException, ClassNotFoundException;
+  protected abstract BSPJob trainInternal(HamaConfiguration conf) throws IOException,
+      InterruptedException, ClassNotFoundException;
 
   /**
    * Read the model meta-data from the specified location.
@@ -124,9 +140,18 @@ public abstract class AbstractNeuralNetwork implements Writable {
   protected void readFromModel() throws IOException {
     Preconditions.checkArgument(this.modelPath != null,
         "Model path has not been set.");
-    FSDataInputStream is = new FSDataInputStream(fs.open(new Path(modelPath)));
-    this.readFields(is);
-    Closeables.close(is, false);
+    Configuration conf = new Configuration();
+    FSDataInputStream is = null;
+    try {
+      URI uri = new URI(this.modelPath);
+      FileSystem fs = FileSystem.get(uri, conf);
+      is = new FSDataInputStream(fs.open(new Path(modelPath)));
+      this.readFields(is);
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    } finally {
+      Closeables.close(is, false);
+    }
   }
 
   /**
@@ -137,9 +162,16 @@ public abstract class AbstractNeuralNetwork implements Writable {
   public void writeModelToFile() throws IOException {
     Preconditions.checkArgument(this.modelPath != null,
         "Model path has not been set.");
-
-    FSDataOutputStream is = fs.create(new Path(this.modelPath), true);
-    this.write(is);
+    Configuration conf = new Configuration();
+    FSDataOutputStream is = null;
+    try {
+      URI uri = new URI(this.modelPath);
+      FileSystem fs = FileSystem.get(uri, conf);
+      is = fs.create(new Path(this.modelPath), true);
+      this.write(is);
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }
 
     Closeables.close(is, false);
   }

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/core/AutoEncoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/AutoEncoder.java b/src/main/java/org/apache/horn/core/AutoEncoder.java
index 35a6f66..1b7a406 100644
--- a/src/main/java/org/apache/horn/core/AutoEncoder.java
+++ b/src/main/java/org/apache/horn/core/AutoEncoder.java
@@ -28,6 +28,7 @@ import org.apache.hama.commons.math.DoubleFunction;
 import org.apache.hama.commons.math.DoubleMatrix;
 import org.apache.hama.commons.math.DoubleVector;
 import org.apache.hama.ml.util.FeatureTransformer;
+import org.apache.horn.core.Constants.LearningStyle;
 import org.apache.horn.funcs.FunctionFactory;
 
 import com.google.common.base.Preconditions;
@@ -59,7 +60,7 @@ public class AutoEncoder {
     model.addLayer(inputDimensions, true,
         FunctionFactory.createDoubleFunction("Sigmoid"), null);
     model
-        .setLearningStyle(AbstractLayeredNeuralNetwork.LearningStyle.UNSUPERVISED);
+        .setLearningStyle(LearningStyle.UNSUPERVISED);
     model.setCostFunction(FunctionFactory
         .createDoubleDoubleFunction("SquaredError"));
   }

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/core/Constants.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/Constants.java b/src/main/java/org/apache/horn/core/Constants.java
new file mode 100644
index 0000000..09883af
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/Constants.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+public class Constants {
+  
+  public static enum LearningStyle {
+    UNSUPERVISED,
+    SUPERVISED
+  }
+  
+  public static enum TrainingMethod {
+    GRADIENT_DESCENT
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/core/HornJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/HornJob.java b/src/main/java/org/apache/horn/core/HornJob.java
index c95ae36..30e9e88 100644
--- a/src/main/java/org/apache/horn/core/HornJob.java
+++ b/src/main/java/org/apache/horn/core/HornJob.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.commons.math.Function;
+import org.apache.horn.core.Constants.LearningStyle;
+import org.apache.horn.core.Constants.TrainingMethod;
 import org.apache.horn.funcs.FunctionFactory;
 
 public class HornJob extends BSPJob {
@@ -77,16 +79,28 @@ public class HornJob extends BSPJob {
     this.conf.setInt("training.batch.size", batchSize);
   }
 
+  public void setTrainingMethod(TrainingMethod method) {
+    this.neuralNetwork.setTrainingMethod(method);
+  }
+  
+  public void setLearningStyle(LearningStyle style) {
+    this.neuralNetwork.setLearningStyle(style);
+  }
+  
   public void setLearningRate(double learningRate) {
-    this.conf.setDouble("mlp.learning.rate", learningRate);
+    this.neuralNetwork.setLearningRate(learningRate);
   }
-
+  
   public void setConvergenceCheckInterval(int n) {
     this.conf.setInt("convergence.check.interval", n);
   }
 
   public void setMomentumWeight(double momentumWeight) {
-    this.conf.setDouble("mlp.momentum.weight", momentumWeight);
+    this.neuralNetwork.setMomemtumWeight(momentumWeight);
+  }
+  
+  public void setRegularizationWeight(double regularizationWeight) {
+    this.neuralNetwork.setRegularizationWeight(regularizationWeight);
   }
 
   public LayeredNeuralNetwork getNeuralNetwork() {
@@ -95,7 +109,7 @@ public class HornJob extends BSPJob {
 
   public boolean waitForCompletion(boolean verbose) throws IOException,
       InterruptedException, ClassNotFoundException {
-    BSPJob job = neuralNetwork.train(this.conf);
+    BSPJob job = neuralNetwork.train((HamaConfiguration) this.conf);
     if (verbose) {
       return job.waitForCompletion(true);
     } else {
@@ -103,10 +117,6 @@ public class HornJob extends BSPJob {
     }
   }
 
-  public void setRegularizationWeight(double regularizationWeight) {
-    this.conf.setDouble("regularization.weight", regularizationWeight);
-  }
-
   public void setModelPath(String modelPath) {
     this.conf.set("model.path", modelPath);
     neuralNetwork.setModelPath(modelPath);

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java b/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
index c858d11..fe5d3a3 100644
--- a/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
+++ b/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.commons.io.MatrixWritable;
@@ -44,6 +43,8 @@ import org.apache.hama.commons.math.DoubleFunction;
 import org.apache.hama.commons.math.DoubleMatrix;
 import org.apache.hama.commons.math.DoubleVector;
 import org.apache.hama.util.ReflectionUtils;
+import org.apache.horn.core.Constants.LearningStyle;
+import org.apache.horn.core.Constants.TrainingMethod;
 import org.apache.horn.funcs.FunctionFactory;
 
 import com.google.common.base.Preconditions;
@@ -75,11 +76,9 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
   protected List<DoubleFunction> squashingFunctionList;
 
   protected List<Class<? extends Neuron>> neuronClassList;
-
+  
   protected int finalLayerIdx;
 
-  protected double regularizationWeight;
-
   public LayeredNeuralNetwork() {
     this.layerSizeList = Lists.newArrayList();
     this.weightMatrixList = Lists.newArrayList();
@@ -90,7 +89,6 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
 
   public LayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
     super(conf, modelPath);
-    this.regularizationWeight = conf.getDouble("regularization.weight", 0);
   }
 
   @Override
@@ -107,17 +105,6 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
 
     this.layerSizeList.add(size);
     int layerIdx = this.layerSizeList.size() - 1;
-    
-    if(layerIdx == 0) {
-      LOG.info("Input Layer: " + (size - 1) + " features");
-    } else {
-      if(!isFinalLayer) {
-        LOG.info("Hidden Layer: " + (size - 1) + " neurons with 1 bias");
-      } else {
-        LOG.info("Output Layer: " + size);
-      }
-    }
-    
     if (isFinalLayer) {
       this.finalLayerIdx = layerIdx;
     }
@@ -165,7 +152,6 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
 
   /**
    * Set the previous weight matrices.
-   * 
    * @param prevUpdates
    */
   void setPrevWeightMatrices(DoubleMatrix[] prevUpdates) {
@@ -277,12 +263,12 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
     for (Class<? extends Neuron> clazz : this.neuronClassList) {
       output.writeUTF(clazz.getName());
     }
-
+    
     // write squashing functions
     output.writeInt(this.squashingFunctionList.size());
     for (DoubleFunction aSquashingFunctionList : this.squashingFunctionList) {
-      WritableUtils.writeString(output,
-          aSquashingFunctionList.getFunctionName());
+      WritableUtils.writeString(output, aSquashingFunctionList
+              .getFunctionName());
     }
 
     // write weight matrices
@@ -344,10 +330,9 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
       intermediateOutput = forward(i, intermediateOutput);
       outputCache.add(intermediateOutput);
     }
-
     return outputCache;
   }
-
+  
   /**
    * @param neuronClass
    * @return a new neuron instance
@@ -369,7 +354,10 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
   protected DoubleVector forward(int fromLayer, DoubleVector intermediateOutput) {
     DoubleMatrix weightMatrix = this.weightMatrixList.get(fromLayer);
 
-    // TODO use the multithread processing
+    // LOG.info("intermediate: " + intermediateOutput.toString());
+    // DoubleVector vec = weightMatrix.multiplyVectorUnsafe(intermediateOutput);
+    // vec = vec.applyToElements(this.squashingFunctionList.get(fromLayer));
+   
     DoubleVector vec = new DenseDoubleVector(weightMatrix.getRowCount());
     for (int row = 0; row < weightMatrix.getRowCount(); row++) {
       List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
@@ -390,14 +378,13 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
       }
       vec.set(row, n.getOutput());
     }
-
+    
     // add bias
     DoubleVector vecWithBias = new DenseDoubleVector(vec.getDimension() + 1);
     vecWithBias.set(0, 1);
     for (int i = 0; i < vec.getDimension(); ++i) {
       vecWithBias.set(i + 1, vec.get(i));
     }
-
     return vecWithBias;
   }
 
@@ -520,7 +507,6 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
     }
 
     this.setPrevWeightMatrices(weightUpdateMatrices);
-
     return weightUpdateMatrices;
   }
 
@@ -539,6 +525,8 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
       DenseDoubleMatrix weightUpdateMatrix) {
 
     // get layer related information
+    DoubleFunction squashingFunction = this.squashingFunctionList
+        .get(curLayerIdx);
     DoubleVector curLayerOutput = outputCache.get(curLayerIdx);
     DoubleMatrix weightMatrix = this.weightMatrixList.get(curLayerIdx);
     DoubleMatrix prevWeightMatrix = this.prevWeightUpdatesList.get(curLayerIdx);
@@ -548,10 +536,10 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
       nextLayerDelta = nextLayerDelta.slice(1,
           nextLayerDelta.getDimension() - 1);
     }
-
-    // DoubleMatrix transposed = weightMatrix.transpose();
+    
     DoubleVector deltaVector = new DenseDoubleVector(
         weightMatrix.getColumnCount());
+    
     for (int row = 0; row < weightMatrix.getColumnCount(); ++row) {
       Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance(this.neuronClassList
           .get(curLayerIdx));
@@ -559,7 +547,7 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
       n.setLearningRate(this.learningRate);
       n.setMomentumWeight(this.momentumWeight);
 
-      n.setSquashingFunction(this.squashingFunctionList.get(curLayerIdx));
+      n.setSquashingFunction(squashingFunction);
       n.setOutput(curLayerOutput.get(row));
 
       List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
@@ -567,7 +555,6 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
       n.setWeightVector(weightMatrix.getRowCount());
 
       for (int col = 0; col < weightMatrix.getRowCount(); ++col) {
-        // sum += (transposed.get(row, col) * nextLayerDelta.get(col));
         msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
             new DoubleWritable(nextLayerDelta.get(col)), new DoubleWritable(
                 weightMatrix.get(col, row)), new DoubleWritable(
@@ -581,7 +568,7 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
         // TODO Auto-generated catch block
         e.printStackTrace();
       }
-
+      
       // update weights
       weightUpdateMatrix.setColumn(row, n.getWeights());
       deltaVector.set(row, n.getDelta());
@@ -591,9 +578,9 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
   }
 
   @Override
-  protected BSPJob trainInternal(HamaConfiguration hamaConf)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    this.conf = hamaConf;
+  protected BSPJob trainInternal(HamaConfiguration conf) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    this.conf = conf;
     this.fs = FileSystem.get(conf);
 
     String modelPath = conf.get("model.path");
@@ -614,11 +601,6 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
     job.setJarByClass(LayeredNeuralNetworkTrainer.class);
     job.setBspClass(LayeredNeuralNetworkTrainer.class);
 
-    // additional for parameter server
-    // TODO at this moment, we use 1 task as a parameter server
-    // In the future, the number of parameter server should be configurable
-    job.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
-
     job.setInputPath(new Path(conf.get("training.input.path")));
     job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
     job.setInputKeyClass(LongWritable.class);
@@ -646,5 +628,5 @@ public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
   public DoubleFunction getSquashingFunction(int idx) {
     return this.squashingFunctionList.get(idx);
   }
-
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
index 68287ad..ce6d6e4 100644
--- a/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
+++ b/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
@@ -18,8 +18,6 @@
 package org.apache.horn.core;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,9 +31,6 @@ import org.apache.hama.commons.io.VectorWritable;
 import org.apache.hama.commons.math.DenseDoubleMatrix;
 import org.apache.hama.commons.math.DoubleMatrix;
 import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.ipc.RPC;
-
-import com.google.common.base.Preconditions;
 
 /**
  * The trainer that train the {@link LayeredNeuralNetwork} based on BSP
@@ -46,39 +41,23 @@ public final class LayeredNeuralNetworkTrainer
     extends
     BSP<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> {
 
-  private static final Log LOG = LogFactory
-      .getLog(LayeredNeuralNetworkTrainer.class);
-
-  /* When given peer is master worker: base of parameter merge */
-  /* When given peer is slave worker: neural network for training */
+  private static final Log LOG = LogFactory.getLog(LayeredNeuralNetworkTrainer.class);
+  
   private LayeredNeuralNetwork inMemoryModel;
-
-  /* Job configuration */
   private HamaConfiguration conf;
-
   /* Default batch size */
   private int batchSize;
 
-  /* whether it is converging or not */
-  private AtomicBoolean isConverge;
+  /* check the interval between intervals */
+  private double prevAvgTrainingError;
+  private double curAvgTrainingError;
+  private long convergenceCheckInterval;
+  private long iterations;
+  private long maxIterations;
+  private long epoch;
+  private boolean isConverge;
 
-  /* When given peer is master worker: Asynchronous parameter merger */
-  /* When given peer is slave worker: null */
-  private RPC.Server merger;
-
-  /* When given peer is master worker: null */
-  /* When given peer is slave worker: proxy to Asynchronous parameter merger */
-  private ParameterMerger proxy;
-
-  /**
-   * Returns true if this worker is master worker.
-   *
-   * @param peer
-   * */
-  private boolean isMaster(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
-    return peer.getPeerIndex() == peer.getNumPeers() - 1;
-  }
+  private String modelPath;
 
   @Override
   /**
@@ -86,43 +65,20 @@ public final class LayeredNeuralNetworkTrainer
    */
   public void setup(
       BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
-    // At least one master & slave worker exist.
-    Preconditions.checkArgument(peer.getNumPeers() >= 2);
+    if (peer.getPeerIndex() == 0) {
+      LOG.info("Begin to train");
+    }
+    this.isConverge = false;
     this.conf = peer.getConfiguration();
-
-    String modelPath = conf.get("model.path");
+    this.iterations = 0;
+    this.epoch = 0;
+    this.modelPath = conf.get("model.path");
+    this.maxIterations = conf.getLong("training.max.iterations", 100000);
+    this.convergenceCheckInterval = conf.getLong("convergence.check.interval",
+        100);
     this.inMemoryModel = new LayeredNeuralNetwork(conf, modelPath);
-
+    this.prevAvgTrainingError = Integer.MAX_VALUE;
     this.batchSize = conf.getInt("training.batch.size", 50);
-    this.isConverge = new AtomicBoolean(false);
-
-    int slaveCount = peer.getNumPeers() - 1;
-    int mergeLimit = conf.getInt("training.max.iterations", 100000);
-    int convergenceCheckInterval = peer.getNumPeers()
-        * conf.getInt("convergence.check.interval", 2000);
-    String master = peer.getPeerName();
-    String masterAddr = master.substring(0, master.indexOf(':'));
-    int port = conf.getInt("sync.server.port", 40089);
-
-    if (isMaster(peer)) {
-      try {
-        this.merger = RPC.getServer(new ParameterMergerServer(inMemoryModel,
-            isConverge, slaveCount, mergeLimit, convergenceCheckInterval),
-            masterAddr, port, slaveCount, false, conf);
-        merger.start();
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-      LOG.info("Begin to train");
-    } else {
-      InetSocketAddress addr = new InetSocketAddress(masterAddr, port);
-      try {
-        this.proxy = (ParameterMerger) RPC.getProxy(ParameterMerger.class,
-            ParameterMerger.versionID, addr, conf);
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
   }
 
   @Override
@@ -132,9 +88,12 @@ public final class LayeredNeuralNetworkTrainer
   public void cleanup(
       BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
     // write model to modelPath
-    if (isMaster(peer)) {
+    if (peer.getPeerIndex() == 0) {
       try {
-        LOG.info("Write model back to " + inMemoryModel.getModelPath());
+        LOG.info(String.format("End of training, number of iterations: %d.\n",
+            this.iterations));
+        LOG.info(String.format("Write model back to %s\n",
+            inMemoryModel.getModelPath()));
         this.inMemoryModel.writeModelToFile();
       } catch (IOException e) {
         e.printStackTrace();
@@ -146,19 +105,20 @@ public final class LayeredNeuralNetworkTrainer
   public void bsp(
       BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
       throws IOException, SyncException, InterruptedException {
-    while (!this.isConverge.get()) {
-      // each slave-worker calculate the matrices updates according to local
-      // data
-      // and merge them with master
-      if (!isMaster(peer)) {
-        calculateUpdates(peer);
+    while (this.iterations++ < maxIterations) {
+      // each groom calculate the matrices updates according to local data
+      calculateUpdates(peer);
+      peer.sync();
+
+      // master merge the updates model
+      if (peer.getPeerIndex() == 0) {
+        mergeUpdates(peer);
+      }
+      peer.sync();
+      if (this.isConverge) {
+        break;
       }
     }
-    peer.sync();
-    if (isMaster(peer)) {
-      merger.stop();
-    }
-    peer.sync(); // finalize the bsp program.
   }
 
   /**
@@ -170,6 +130,19 @@ public final class LayeredNeuralNetworkTrainer
   private void calculateUpdates(
       BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
       throws IOException {
+    // receive update information from master
+    if (peer.getNumCurrentMessages() != 0) {
+      ParameterMessage inMessage = peer.getCurrentMessage();
+      DoubleMatrix[] newWeights = inMessage.getCurMatrices();
+      DoubleMatrix[] preWeightUpdates = inMessage.getPrevMatrices();
+      this.inMemoryModel.setWeightMatrices(newWeights);
+      this.inMemoryModel.setPrevWeightMatrices(preWeightUpdates);
+      this.isConverge = inMessage.isConverge();
+      // check converge
+      if (isConverge) {
+        return;
+      }
+    }
 
     DoubleMatrix[] weightUpdates = new DoubleMatrix[this.inMemoryModel.weightMatrixList
         .size()];
@@ -186,6 +159,10 @@ public final class LayeredNeuralNetworkTrainer
     for (int recordsRead = 0; recordsRead < batchSize; ++recordsRead) {
       if (!peer.readNext(key, value)) {
         peer.reopenInput();
+        if (peer.getPeerIndex() == 0) {
+          epoch++;
+          LOG.info("Training loss: " + curAvgTrainingError + " at " + (epoch) + " epoch.");
+        }
         peer.readNext(key, value);
       }
       DoubleVector trainingInstance = value.getVector();
@@ -200,17 +177,78 @@ public final class LayeredNeuralNetworkTrainer
       weightUpdates[i] = weightUpdates[i].divide(batchSize);
     }
 
-    // exchange parameter update with master
-    ParameterMessage msg = new ParameterMessage(
-        avgTrainingError, false, weightUpdates,
-        this.inMemoryModel.getPrevMatricesUpdates());
-
-    ParameterMessage inMessage = proxy.merge(msg);
-    DoubleMatrix[] newWeights = inMessage.getCurMatrices();
-    DoubleMatrix[] preWeightUpdates = inMessage.getPrevMatrices();
-    this.inMemoryModel.setWeightMatrices(newWeights);
-    this.inMemoryModel.setPrevWeightMatrices(preWeightUpdates);
-    this.isConverge.set(inMessage.isConverge());
+    DoubleMatrix[] prevWeightUpdates = this.inMemoryModel
+        .getPrevMatricesUpdates();
+    ParameterMessage outMessage = new ParameterMessage(
+        avgTrainingError, false, weightUpdates, prevWeightUpdates);
+    peer.send(peer.getPeerName(0), outMessage);
+  }
+
+  /**
+   * Merge the updates according to the updates of the grooms.
+   * 
+   * @param peer
+   * @throws IOException
+   */
+  private void mergeUpdates(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
+      throws IOException {
+    int numMessages = peer.getNumCurrentMessages();
+    boolean isConverge = false;
+    if (numMessages == 0) { // converges
+      isConverge = true;
+      return;
+    }
+
+    double avgTrainingError = 0;
+    DoubleMatrix[] matricesUpdates = null;
+    DoubleMatrix[] prevMatricesUpdates = null;
+
+    while (peer.getNumCurrentMessages() > 0) {
+      ParameterMessage message = peer.getCurrentMessage();
+      if (matricesUpdates == null) {
+        matricesUpdates = message.getCurMatrices();
+        prevMatricesUpdates = message.getPrevMatrices();
+      } else {
+        LayeredNeuralNetwork.matricesAdd(matricesUpdates,
+            message.getCurMatrices());
+        LayeredNeuralNetwork.matricesAdd(prevMatricesUpdates,
+            message.getPrevMatrices());
+      }
+      
+      avgTrainingError += message.getTrainingError();
+    }
+
+    if (numMessages > 1) {
+      avgTrainingError /= numMessages;
+      for (int i = 0; i < matricesUpdates.length; ++i) {
+        matricesUpdates[i] = matricesUpdates[i].divide(numMessages);
+        prevMatricesUpdates[i] = prevMatricesUpdates[i].divide(numMessages);
+      }
+    }
+
+    this.inMemoryModel.updateWeightMatrices(matricesUpdates);
+    this.inMemoryModel.setPrevWeightMatrices(prevMatricesUpdates);
+    
+    // check convergence
+    if (iterations % convergenceCheckInterval == 0) {
+      if (prevAvgTrainingError < curAvgTrainingError) {
+        // error cannot decrease any more
+        isConverge = true;
+      }
+      // update
+      prevAvgTrainingError = curAvgTrainingError;
+      curAvgTrainingError = 0;
+    }
+    curAvgTrainingError += avgTrainingError / convergenceCheckInterval;
+
+    // broadcast updated weight matrices
+    for (String peerName : peer.getAllPeerNames()) {
+      ParameterMessage msg = new ParameterMessage(
+          0, isConverge, this.inMemoryModel.getWeightMatrices(),
+          this.inMemoryModel.getPrevMatricesUpdates());
+      peer.send(peerName, msg);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/core/ParameterMergerServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/ParameterMergerServer.java b/src/main/java/org/apache/horn/core/ParameterMergerServer.java
index c76a4d0..7bd5543 100644
--- a/src/main/java/org/apache/horn/core/ParameterMergerServer.java
+++ b/src/main/java/org/apache/horn/core/ParameterMergerServer.java
@@ -89,15 +89,15 @@ public class ParameterMergerServer implements ParameterMerger {
     LOG.info("Start merging: " + this.mergeCount);
 
     if (!this.isConverge.get()) {
-      for (int i = 0; i < weightUpdates.length; ++i) {
-        weightUpdates[i] = weightUpdates[i].divide(this.SlaveCount);
-        prevWeightUpdates[i] = prevWeightUpdates[i].divide(this.SlaveCount);
-      }
-
       synchronized (inMemoryModel) {
-        this.inMemoryModel.updateWeightMatrices(weightUpdates);
-        this.inMemoryModel.setPrevWeightMatrices(prevWeightUpdates);
 
+        LOG.info(">>>> before: " + this.inMemoryModel.getWeightMatrices()[0].get(0, 0));
+        
+        // this.inMemoryModel.addWeights(weightUpdates);
+        // this.inMemoryModel.addPrevWeights(prevWeightUpdates);
+        
+        LOG.info(", after: " + this.inMemoryModel.getWeightMatrices()[0].get(0, 0));
+        
         // add trainingError to trainingErrors
         this.trainingErrors[this.curTrainingError++] = trainingError;
 

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/core/ParameterMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/ParameterMessage.java b/src/main/java/org/apache/horn/core/ParameterMessage.java
index 3905e25..524c443 100644
--- a/src/main/java/org/apache/horn/core/ParameterMessage.java
+++ b/src/main/java/org/apache/horn/core/ParameterMessage.java
@@ -39,6 +39,8 @@ public class ParameterMessage implements Writable {
   protected boolean converge;
 
   public ParameterMessage() {
+    this.converge = false;
+    this.trainingError = 0.0d;
   }
 
   public ParameterMessage(double trainingError, boolean converge,
@@ -53,15 +55,19 @@ public class ParameterMessage implements Writable {
   public void readFields(DataInput input) throws IOException {
     trainingError = input.readDouble();
     converge = input.readBoolean();
-    int numMatrices = input.readInt();
-    boolean hasPrevMatrices = input.readBoolean();
-    curMatrices = new DenseDoubleMatrix[numMatrices];
-    // read matrice updates
-    for (int i = 0; i < curMatrices.length; ++i) {
-      curMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
+    boolean hasCurMatrices = input.readBoolean();
+    if(hasCurMatrices) {
+      int numMatrices = input.readInt();
+      curMatrices = new DenseDoubleMatrix[numMatrices];
+      // read matrice updates
+      for (int i = 0; i < curMatrices.length; ++i) {
+        curMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
+      }
     }
-
+    
+    boolean hasPrevMatrices = input.readBoolean();
     if (hasPrevMatrices) {
+      int numMatrices = input.readInt();
       prevMatrices = new DenseDoubleMatrix[numMatrices];
       // read previous matrices updates
       for (int i = 0; i < prevMatrices.length; ++i) {
@@ -74,16 +80,21 @@ public class ParameterMessage implements Writable {
   public void write(DataOutput output) throws IOException {
     output.writeDouble(trainingError);
     output.writeBoolean(converge);
-    output.writeInt(curMatrices.length);
-    if (prevMatrices == null) {
+    if (curMatrices == null) {
       output.writeBoolean(false);
     } else {
       output.writeBoolean(true);
+      output.writeInt(curMatrices.length);
+      for (DoubleMatrix matrix : curMatrices) {
+        MatrixWritable.write(matrix, output);
+      }
     }
-    for (DoubleMatrix matrix : curMatrices) {
-      MatrixWritable.write(matrix, output);
-    }
-    if (prevMatrices != null) {
+    
+    if (prevMatrices == null) {
+      output.writeBoolean(false);
+    } else {
+      output.writeBoolean(true);
+      output.writeInt(prevMatrices.length);
       for (DoubleMatrix matrix : prevMatrices) {
         MatrixWritable.write(matrix, output);
       }

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
index c24fa16..4c0df95 100644
--- a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
+++ b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hama.HamaConfiguration;
+import org.apache.horn.core.Constants.TrainingMethod;
 import org.apache.horn.core.HornJob;
 import org.apache.horn.core.Neuron;
 import org.apache.horn.core.Synapse;
@@ -48,40 +49,43 @@ public class MultiLayerPerceptron {
     public void backward(
         Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
         throws IOException {
+      double gradient = 0;
       for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
         // Calculates error gradient for each neuron
-        double gradient = this.squashingFunction.applyDerivative(this
-            .getOutput()) * (m.getDelta() * m.getWeight());
-        this.backpropagate(gradient);
+        gradient += (m.getDelta() * m.getWeight());
 
         // Weight corrections
         double weight = -this.getLearningRate() * this.getOutput()
             * m.getDelta() + this.getMomentumWeight() * m.getPrevWeight();
         this.push(weight);
       }
+
+      this.backpropagate(gradient
+          * this.squashingFunction.applyDerivative(this.getOutput()));
     }
   }
 
   public static HornJob createJob(HamaConfiguration conf, String modelPath,
       String inputPath, double learningRate, double momemtumWeight,
-      double regularizationWeight, int features, int labels, int maxIteration,
-      int numOfTasks) throws IOException {
+      double regularizationWeight, int features, int hu, int labels,
+      int miniBatch, int maxIteration) throws IOException {
 
     HornJob job = new HornJob(conf, MultiLayerPerceptron.class);
     job.setTrainingSetPath(inputPath);
     job.setModelPath(modelPath);
 
-    job.setNumBspTask(numOfTasks);
     job.setMaxIteration(maxIteration);
     job.setLearningRate(learningRate);
     job.setMomentumWeight(momemtumWeight);
     job.setRegularizationWeight(regularizationWeight);
-
-    job.setConvergenceCheckInterval(1000);
-    job.setBatchSize(300);
+    
+    job.setConvergenceCheckInterval(600);
+    job.setBatchSize(miniBatch);
+    
+    job.setTrainingMethod(TrainingMethod.GRADIENT_DESCENT);
 
     job.inputLayer(features, Sigmoid.class, StandardNeuron.class);
-    job.addLayer(15, Sigmoid.class, StandardNeuron.class);
+    job.addLayer(hu, Sigmoid.class, StandardNeuron.class);
     job.outputLayer(labels, Sigmoid.class, StandardNeuron.class);
 
     job.setCostFunction(CrossEntropy.class);
@@ -92,18 +96,18 @@ public class MultiLayerPerceptron {
   public static void main(String[] args) throws IOException,
       InterruptedException, ClassNotFoundException {
     if (args.length < 9) {
-      System.out
-          .println("Usage: <MODEL_PATH> <INPUT_PATH> "
-              + "<LEARNING_RATE> <MOMEMTUM_WEIGHT> <REGULARIZATION_WEIGHT> "
-              + "<FEATURE_DIMENSION> <LABEL_DIMENSION> <MAX_ITERATION> <NUM_TASKS>");
-      System.exit(1);
+      System.out.println("Usage: <MODEL_PATH> <INPUT_PATH> "
+          + "<LEARNING_RATE> <MOMEMTUM_WEIGHT> <REGULARIZATION_WEIGHT> "
+          + "<FEATURE_DIMENSION> <HIDDEN_UNITS> <LABEL_DIMENSION> "
+          + "<BATCH_SIZE> <MAX_ITERATION>");
+      System.exit(-1);
     }
 
     HornJob ann = createJob(new HamaConfiguration(), args[0], args[1],
         Double.parseDouble(args[2]), Double.parseDouble(args[3]),
         Double.parseDouble(args[4]), Integer.parseInt(args[5]),
         Integer.parseInt(args[6]), Integer.parseInt(args[7]),
-        Integer.parseInt(args[8]));
+        Integer.parseInt(args[8]), Integer.parseInt(args[9]));
 
     long startTime = System.currentTimeMillis();
     if (ann.waitForCompletion(true)) {

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/funcs/CrossEntropy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/funcs/CrossEntropy.java b/src/main/java/org/apache/horn/funcs/CrossEntropy.java
index 567db29..7cc5e6a 100644
--- a/src/main/java/org/apache/horn/funcs/CrossEntropy.java
+++ b/src/main/java/org/apache/horn/funcs/CrossEntropy.java
@@ -32,27 +32,21 @@ public class CrossEntropy extends DoubleDoubleFunction {
   @Override
   public double apply(double target, double actual) {
     double adjustedTarget = (target == 0 ? 0.000001 : target);
-    adjustedTarget = (target == 1.0 ? 0.999999 : target);
+    adjustedTarget = (target == 1.0 ? 0.999999 : adjustedTarget);
     double adjustedActual = (actual == 0 ? 0.000001 : actual);
-    adjustedActual = (actual == 1 ? 0.999999 : actual);
+    adjustedActual = (actual == 1 ? 0.999999 : adjustedActual);
+    
     return -adjustedTarget * Math.log(adjustedActual) - (1 - adjustedTarget)
         * Math.log(1 - adjustedActual);
   }
 
   @Override
   public double applyDerivative(double target, double actual) {
-    double adjustedTarget = target;
-    double adjustedActual = actual;
-    if (adjustedActual == 1) {
-      adjustedActual = 0.999;
-    } else if (actual == 0) {
-      adjustedActual = 0.001;
-    }
-    if (adjustedTarget == 1) {
-      adjustedTarget = 0.999;
-    } else if (adjustedTarget == 0) {
-      adjustedTarget = 0.001;
-    }
+    double adjustedTarget = (target == 0 ? 0.000001 : target);
+    adjustedTarget = (target == 1.0 ? 0.999999 : adjustedTarget);
+    double adjustedActual = (actual == 0 ? 0.000001 : actual);
+    adjustedActual = (actual == 1 ? 0.999999 : adjustedActual);
+    
     return -adjustedTarget / adjustedActual + (1 - adjustedTarget)
         / (1 - adjustedActual);
   }

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/utils/MNISTConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/utils/MNISTConverter.java b/src/main/java/org/apache/horn/utils/MNISTConverter.java
index 3a2f8b8..6bbe891 100644
--- a/src/main/java/org/apache/horn/utils/MNISTConverter.java
+++ b/src/main/java/org/apache/horn/utils/MNISTConverter.java
@@ -33,9 +33,13 @@ public class MNISTConverter {
 
   private static int PIXELS = 28 * 28;
 
+  private static double rescale(double x) {
+    return 1 - (255 - x) / 255;
+  }
+
   public static void main(String[] args) throws Exception {
     if (args.length < 3) {
-      System.out.println("Usage: TRAINING_DATA LABELS_DATA OUTPUT_PATH");
+      System.out.println("Usage: <TRAINING_DATA> <LABELS_DATA> <OUTPUT_PATH>");
       System.out
           .println("ex) train-images.idx3-ubyte train-labels.idx1-ubyte /tmp/mnist.seq");
       System.exit(1);
@@ -76,14 +80,15 @@ public class MNISTConverter {
     for (int i = 0; i < count; i++) {
       double[] vals = new double[PIXELS + 10];
       for (int j = 0; j < PIXELS; j++) {
-        vals[j] = (images[i][j] & 0xff);
+        vals[j] = rescale((images[i][j] & 0xff));
       }
       int label = (labels[i] & 0xff);
+      // embedding to one-hot vector
       for (int j = 0; j < 10; j++) {
         if (j == label)
-          vals[PIXELS + j] = 1;
+          vals[PIXELS + j] = 1.0;
         else
-          vals[PIXELS + j] = 0;
+          vals[PIXELS + j] = 0.0;
       }
 
       writer.append(new LongWritable(), new VectorWritable(
@@ -94,4 +99,5 @@ public class MNISTConverter {
     labelsIn.close();
     writer.close();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/main/java/org/apache/horn/utils/MNISTEvaluator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/utils/MNISTEvaluator.java b/src/main/java/org/apache/horn/utils/MNISTEvaluator.java
new file mode 100644
index 0000000..a5b68e0
--- /dev/null
+++ b/src/main/java/org/apache/horn/utils/MNISTEvaluator.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.utils;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.horn.core.LayeredNeuralNetwork;
+
+public class MNISTEvaluator {
+
+  private static int PIXELS = 28 * 28;
+  
+  private static double rescale(double x) {
+    return 1 - (255 - x) / 255;
+  }
+  
+  public static void main(String[] args) throws IOException {
+    if (args.length < 3) {
+      System.out.println("Usage: <TRAINED_MODEL> <TEST_IMAGES> <TEST_LABELS>");
+      System.out
+          .println("ex) /tmp/model t10k-images.idx3-ubyte t10k-labels.idx1-ubyte");
+      System.exit(1);
+    }
+
+    String modelPath = args[0];
+    String training_data = args[1];
+    String labels_data = args[2];
+
+    DataInputStream imagesIn = new DataInputStream(new FileInputStream(
+        new File(training_data)));
+    DataInputStream labelsIn = new DataInputStream(new FileInputStream(
+        new File(labels_data)));
+    
+    imagesIn.readInt(); // Magic number
+    int count = imagesIn.readInt();
+    labelsIn.readInt(); // Magic number
+    labelsIn.readInt(); // Count
+    imagesIn.readInt(); // Rows
+    imagesIn.readInt(); // Cols
+    
+    System.out.println("Evaluating " + count + " images");
+
+    byte[][] images = new byte[count][PIXELS];
+    byte[] labels = new byte[count];
+    for (int n = 0; n < count; n++) {
+      imagesIn.readFully(images[n]);
+      labels[n] = labelsIn.readByte();
+    }
+
+    HamaConfiguration conf = new HamaConfiguration();
+    LayeredNeuralNetwork ann = new LayeredNeuralNetwork(conf, modelPath);
+    
+    int correct = 0;
+    for (int i = 0; i < count; i++) {
+      double[] vals = new double[PIXELS];
+      for (int j = 0; j < PIXELS; j++) {
+        vals[j] = rescale((images[i][j] & 0xff));
+      }
+      int label = (labels[i] & 0xff);
+
+      DoubleVector instance = new DenseDoubleVector(vals);
+      DoubleVector result = ann.getOutput(instance);
+      
+      if(getNumber(result) == label) {
+        correct++;
+      }
+    }
+
+    System.out.println((double) correct / count);
+    // TODO System.out.println("Precision = " + (tp / (tp + fp)));
+    //System.out.println("Recall = " + (tp / (tp + fn)));
+    //System.out.println("Accuracy = " + ((tp + tn) / (tp + tn + fp + fn)));
+    
+    imagesIn.close();
+    labelsIn.close();
+  }
+
+  private static int getNumber(DoubleVector result) {
+    double max = 0;
+    int index = -1;
+    for(int x = 0; x < result.getLength(); x++) {
+      double curr = result.get(x);
+      if(max < curr) {
+        max = curr;
+        index = x;
+      }
+    }
+    return index;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/test/java/org/apache/horn/core/TestNeuron.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/core/TestNeuron.java b/src/test/java/org/apache/horn/core/TestNeuron.java
index 5f2bb59..0e4ba8e 100644
--- a/src/test/java/org/apache/horn/core/TestNeuron.java
+++ b/src/test/java/org/apache/horn/core/TestNeuron.java
@@ -24,6 +24,7 @@ import java.util.List;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.horn.funcs.CrossEntropy;
 import org.apache.horn.funcs.Sigmoid;
 
 public class TestNeuron extends TestCase {
@@ -43,9 +44,10 @@ public class TestNeuron extends TestCase {
         sum += m.getInput() * m.getWeight();
       }
       sum += (bias * theta);
+      System.out.println(new CrossEntropy().apply(0.000001, 1.0));
       this.feedforward(new Sigmoid().apply(sum));
     }
-
+    
     @Override
     public void backward(
         Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java b/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
index 7e0cac9..a6914ef 100644
--- a/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
+++ b/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
@@ -46,8 +46,8 @@ import org.apache.hama.commons.math.DoubleMatrix;
 import org.apache.hama.commons.math.DoubleVector;
 import org.apache.hama.ml.util.DefaultFeatureTransformer;
 import org.apache.hama.ml.util.FeatureTransformer;
-import org.apache.horn.core.AbstractLayeredNeuralNetwork.LearningStyle;
-import org.apache.horn.core.AbstractLayeredNeuralNetwork.TrainingMethod;
+import org.apache.horn.core.Constants.LearningStyle;
+import org.apache.horn.core.Constants.TrainingMethod;
 import org.apache.horn.funcs.FunctionFactory;
 import org.junit.Test;
 import org.mortbay.log.Log;

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/9f35e9fb/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java b/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
index 80a08f2..bb404d6 100644
--- a/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
+++ b/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
@@ -23,12 +23,15 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.URI;
 
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hama.Constants;
-import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.commons.io.VectorWritable;
 import org.apache.hama.commons.math.DenseDoubleVector;
@@ -39,7 +42,8 @@ import org.apache.horn.core.LayeredNeuralNetwork;
 /**
  * Test the functionality of NeuralNetwork Example.
  */
-public class MultiLayerPerceptronTest extends HamaCluster {
+public class MultiLayerPerceptronTest extends TestCase { // HamaCluster {
+  private static final Log LOG = LogFactory.getLog(MultiLayerPerceptronTest.class);
   private HamaConfiguration conf;
   private FileSystem fs;
   private String MODEL_PATH = "/tmp/neuralnets.model";
@@ -47,10 +51,10 @@ public class MultiLayerPerceptronTest extends HamaCluster {
   private String SEQTRAIN_DATA = "/tmp/test-neuralnets.data";
 
   public MultiLayerPerceptronTest() {
-    conf = new HamaConfiguration();
+    conf = new HamaConfiguration();/*
     conf.set("bsp.master.address", "localhost");
     conf.setBoolean("hama.child.redirect.log.console", true);
-    conf.setBoolean("hama.messenger.runtime.compression", true);
+    conf.setBoolean("hama.messenger.runtime.compression", false);
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         conf.get("bsp.master.address"));
     conf.set("bsp.local.dir", "/tmp/hama-test");
@@ -58,7 +62,7 @@ public class MultiLayerPerceptronTest extends HamaCluster {
     conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
     conf.set("hama.sync.client.class",
         org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
-            .getCanonicalName());
+            .getCanonicalName());*/
   }
 
   @Override
@@ -77,8 +81,7 @@ public class MultiLayerPerceptronTest extends HamaCluster {
 
     String featureDataPath = "src/test/resources/neuralnets_classification_test.txt";
     try {
-      LayeredNeuralNetwork ann = new LayeredNeuralNetwork(conf,
-          MODEL_PATH);
+      LayeredNeuralNetwork ann = new LayeredNeuralNetwork(conf, MODEL_PATH);
 
       // process data in streaming approach
       FileSystem fs = FileSystem.get(new URI(featureDataPath), conf);
@@ -126,7 +129,7 @@ public class MultiLayerPerceptronTest extends HamaCluster {
     } finally {
       fs.delete(new Path(RESULT_PATH), true);
       fs.delete(new Path(MODEL_PATH), true);
-      //fs.delete(new Path(SEQTRAIN_DATA), true);
+      fs.delete(new Path(SEQTRAIN_DATA), true);
     }
   }
 
@@ -161,8 +164,8 @@ public class MultiLayerPerceptronTest extends HamaCluster {
 
     try {
       HornJob ann = MultiLayerPerceptron.createJob(conf, MODEL_PATH,
-          SEQTRAIN_DATA, 0.4, 0.2, 0.01, featureDimension, labelDimension,
-          1000, 2);
+          SEQTRAIN_DATA, 0.4, 0.2, 0.01, featureDimension, 8, labelDimension,
+          300, 10000);
 
       long startTime = System.currentTimeMillis();
       if (ann.waitForCompletion(true)) {