You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@horn.apache.org by ed...@apache.org on 2016/04/26 05:46:22 UTC

[1/4] incubator-horn git commit: Code refactoring

Repository: incubator-horn
Updated Branches:
  refs/heads/master 277773c0a -> ac8eaf8e1


http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java b/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
new file mode 100644
index 0000000..7e4328f
--- /dev/null
+++ b/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
@@ -0,0 +1,642 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.hama.ml.util.DefaultFeatureTransformer;
+import org.apache.hama.ml.util.FeatureTransformer;
+import org.apache.horn.core.AbstractLayeredNeuralNetwork.LearningStyle;
+import org.apache.horn.core.AbstractLayeredNeuralNetwork.TrainingMethod;
+import org.apache.horn.funcs.FunctionFactory;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+/**
+ * Test the functionality of SmallLayeredNeuralNetwork.
+ * 
+ */
+public class TestSmallLayeredNeuralNetwork extends MLTestBase {
+
+  @Test
+  public void testReadWrite() {
+    LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+    ann.addLayer(2, false,
+        FunctionFactory.createDoubleFunction("IdentityFunction"));
+    ann.addLayer(5, false,
+        FunctionFactory.createDoubleFunction("IdentityFunction"));
+    ann.addLayer(1, true,
+        FunctionFactory.createDoubleFunction("IdentityFunction"));
+    ann.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction("SquaredError"));
+    double learningRate = 0.2;
+    // ann.setLearningRate(learningRate);
+    double momentumWeight = 0.5;
+    // ann.setMomemtumWeight(momentumWeight);
+    double regularizationWeight = 0.05;
+    //ann.setRegularizationWeight(regularizationWeight);
+    // intentionally initialize all weights to 0.5
+    DoubleMatrix[] matrices = new DenseDoubleMatrix[2];
+    matrices[0] = new DenseDoubleMatrix(5, 3, 0.2);
+    matrices[1] = new DenseDoubleMatrix(1, 6, 0.8);
+    ann.setWeightMatrices(matrices);
+    ann.setLearningStyle(LearningStyle.UNSUPERVISED);
+    
+    FeatureTransformer defaultFeatureTransformer = new DefaultFeatureTransformer();
+    ann.setFeatureTransformer(defaultFeatureTransformer);
+    
+
+    // write to file
+    String modelPath = "/tmp/testSmallLayeredNeuralNetworkReadWrite";
+    ann.setModelPath(modelPath);
+    try {
+      ann.writeModelToFile();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    // read from file
+    LayeredNeuralNetwork annCopy = new LayeredNeuralNetwork(new HamaConfiguration(), modelPath);
+    assertEquals(annCopy.getClass().getSimpleName(), annCopy.getModelType());
+    assertEquals(modelPath, annCopy.getModelPath());
+    // assertEquals(learningRate, annCopy.getLearningRate(), 0.000001);
+    // assertEquals(momentumWeight, annCopy.getMomemtumWeight(), 0.000001);
+    //assertEquals(regularizationWeight, annCopy.getRegularizationWeight(),
+    //    0.000001);
+    assertEquals(TrainingMethod.GRADIENT_DESCENT, annCopy.getTrainingMethod());
+    assertEquals(LearningStyle.UNSUPERVISED, annCopy.getLearningStyle());
+
+    // compare weights
+    DoubleMatrix[] weightsMatrices = annCopy.getWeightMatrices();
+    for (int i = 0; i < weightsMatrices.length; ++i) {
+      DoubleMatrix expectMat = matrices[i];
+      DoubleMatrix actualMat = weightsMatrices[i];
+      for (int j = 0; j < expectMat.getRowCount(); ++j) {
+        for (int k = 0; k < expectMat.getColumnCount(); ++k) {
+          assertEquals(expectMat.get(j, k), actualMat.get(j, k), 0.000001);
+        }
+      }
+    }
+    
+    FeatureTransformer copyTransformer = annCopy.getFeatureTransformer();
+    assertEquals(defaultFeatureTransformer.getClass().getName(), copyTransformer.getClass().getName());
+  }
+
+  @Test
+  /**
+   * Test the forward functionality.
+   */
+  public void testOutput() {
+    // first network
+    LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+    ann.addLayer(2, false,
+        FunctionFactory.createDoubleFunction("IdentityFunction"));
+    ann.addLayer(5, false,
+        FunctionFactory.createDoubleFunction("IdentityFunction"));
+    ann.addLayer(1, true,
+        FunctionFactory.createDoubleFunction("IdentityFunction"));
+    ann.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction("SquaredError"));
+    // ann.setLearningRate(0.1);
+    // intentionally initialize all weights to 0.5
+    DoubleMatrix[] matrices = new DenseDoubleMatrix[2];
+    matrices[0] = new DenseDoubleMatrix(5, 3, 0.5);
+    matrices[1] = new DenseDoubleMatrix(1, 6, 0.5);
+    ann.setWeightMatrices(matrices);
+
+    double[] arr = new double[] { 0, 1 };
+    DoubleVector training = new DenseDoubleVector(arr);
+    DoubleVector result = ann.getOutput(training);
+    assertEquals(1, result.getDimension());
+    // assertEquals(3, result.get(0), 0.000001);
+
+    // second network
+    LayeredNeuralNetwork ann2 = new LayeredNeuralNetwork();
+    ann2.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann2.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann2.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann2.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction("SquaredError"));
+    // ann2.setLearningRate(0.3);
+    // intentionally initialize all weights to 0.5
+    DoubleMatrix[] matrices2 = new DenseDoubleMatrix[2];
+    matrices2[0] = new DenseDoubleMatrix(3, 3, 0.5);
+    matrices2[1] = new DenseDoubleMatrix(1, 4, 0.5);
+    ann2.setWeightMatrices(matrices2);
+
+    double[] test = { 0, 0 };
+    double[] result2 = { 0.807476 };
+
+    DoubleVector vec = ann2.getOutput(new DenseDoubleVector(test));
+    assertArrayEquals(result2, vec.toArray(), 0.000001);
+
+    LayeredNeuralNetwork ann3 = new LayeredNeuralNetwork();
+    ann3.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann3.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann3.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann3.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction("SquaredError"));
+    // ann3.setLearningRate(0.3);
+    // intentionally initialize all weights to 0.5
+    DoubleMatrix[] initMatrices = new DenseDoubleMatrix[2];
+    initMatrices[0] = new DenseDoubleMatrix(3, 3, 0.5);
+    initMatrices[1] = new DenseDoubleMatrix(1, 4, 0.5);
+    ann3.setWeightMatrices(initMatrices);
+
+    double[] instance = { 0, 1 };
+    DoubleVector output = ann3.getOutput(new DenseDoubleVector(instance));
+    assertEquals(0.8315410, output.get(0), 0.000001);
+  }
+
+  @Test
+  public void testXORlocal() {
+    LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+    ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction("SquaredError"));
+    // ann.setLearningRate(0.5);
+    // ann.setMomemtumWeight(0.0);
+
+    int iterations = 50000; // iteration should be set to a very large number
+    double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
+    for (int i = 0; i < iterations; ++i) {
+      DoubleMatrix[] matrices = null;
+      for (int j = 0; j < instances.length; ++j) {
+        matrices = ann.trainByInstance(new DenseDoubleVector(instances[j
+            % instances.length]));
+        ann.updateWeightMatrices(matrices);
+      }
+    }
+
+    for (int i = 0; i < instances.length; ++i) {
+      DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
+      // the expected output is the last element in array
+      double result = instances[i][2];
+      double actual = ann.getOutput(input).get(0);
+      if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
+        Log.info("Neural network failes to lear the XOR.");
+      }
+    }
+
+    // write model into file and read out
+    String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocal";
+    ann.setModelPath(modelPath);
+    try {
+      ann.writeModelToFile();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    LayeredNeuralNetwork annCopy = new LayeredNeuralNetwork(new HamaConfiguration(), modelPath);
+    // test on instances
+    for (int i = 0; i < instances.length; ++i) {
+      DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
+      // the expected output is the last element in array
+      double result = instances[i][2];
+      double actual = annCopy.getOutput(input).get(0);
+      if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
+        Log.info("Neural network failes to lear the XOR.");
+      }
+    }
+  }
+
+  @Test
+  public void testXORWithMomentum() {
+    LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+    ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction("SquaredError"));
+    // ann.setLearningRate(0.6);
+    // ann.setMomemtumWeight(0.3);
+
+    int iterations = 2000; // iteration should be set to a very large number
+    double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
+    for (int i = 0; i < iterations; ++i) {
+      for (int j = 0; j < instances.length; ++j) {
+        ann.trainOnline(new DenseDoubleVector(instances[j % instances.length]));
+      }
+    }
+
+    for (int i = 0; i < instances.length; ++i) {
+      DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
+      // the expected output is the last element in array
+      double result = instances[i][2];
+      double actual = ann.getOutput(input).get(0);
+      if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
+        Log.info("Neural network failes to lear the XOR.");
+      }
+    }
+
+    // write model into file and read out
+    String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocalWithMomentum";
+    ann.setModelPath(modelPath);
+    try {
+      ann.writeModelToFile();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    LayeredNeuralNetwork annCopy = new LayeredNeuralNetwork(new HamaConfiguration(), modelPath);
+    // test on instances
+    for (int i = 0; i < instances.length; ++i) {
+      DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
+      // the expected output is the last element in array
+      double result = instances[i][2];
+      double actual = annCopy.getOutput(input).get(0);
+      if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
+        Log.info("Neural network failes to lear the XOR.");
+      }
+    }
+  }
+
+  @Test
+  public void testXORLocalWithRegularization() {
+    LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+    ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction("SquaredError"));
+    // ann.setLearningRate(0.7);
+    // ann.setMomemtumWeight(0.5);
+    //ann.setRegularizationWeight(0.002);
+
+    int iterations = 5000; // iteration should be set to a very large number
+    double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
+    for (int i = 0; i < iterations; ++i) {
+      for (int j = 0; j < instances.length; ++j) {
+        ann.trainOnline(new DenseDoubleVector(instances[j % instances.length]));
+      }
+    }
+
+    for (int i = 0; i < instances.length; ++i) {
+      DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
+      // the expected output is the last element in array
+      double result = instances[i][2];
+      double actual = ann.getOutput(input).get(0);
+      if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
+        Log.info("Neural network failes to lear the XOR.");
+      }
+    }
+
+    // write model into file and read out
+    String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocalWithRegularization";
+    ann.setModelPath(modelPath);
+    try {
+      ann.writeModelToFile();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    LayeredNeuralNetwork annCopy = new LayeredNeuralNetwork(new HamaConfiguration(), modelPath);
+    // test on instances
+    for (int i = 0; i < instances.length; ++i) {
+      DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
+      // the expected output is the last element in array
+      double result = instances[i][2];
+      double actual = annCopy.getOutput(input).get(0);
+      if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
+        Log.info("Neural network failes to lear the XOR.");
+      }
+    }
+  }
+
+  @Test
+  public void testTwoClassClassification() {
+    // use logistic regression data
+    String filepath = "src/test/resources/logistic_regression_data.txt";
+    List<double[]> instanceList = new ArrayList<double[]>();
+
+    try {
+      BufferedReader br = new BufferedReader(new FileReader(filepath));
+      String line = null;
+      while ((line = br.readLine()) != null) {
+        String[] tokens = line.trim().split(",");
+        double[] instance = new double[tokens.length];
+        for (int i = 0; i < tokens.length; ++i) {
+          instance[i] = Double.parseDouble(tokens[i]);
+        }
+        instanceList.add(instance);
+      }
+      br.close();
+    } catch (FileNotFoundException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    zeroOneNormalization(instanceList, instanceList.get(0).length - 1);
+    
+    int dimension = instanceList.get(0).length - 1;
+
+    // divide dataset into training and testing
+    List<double[]> testInstances = new ArrayList<double[]>();
+    testInstances.addAll(instanceList.subList(instanceList.size() - 100,
+        instanceList.size()));
+    List<double[]> trainingInstances = instanceList.subList(0,
+        instanceList.size() - 100);
+
+    LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+    // ann.setLearningRate(0.001);
+    // ann.setMomemtumWeight(0.1);
+    //ann.setRegularizationWeight(0.01);
+    ann.addLayer(dimension, false,
+        FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(dimension, false,
+        FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(dimension, false,
+        FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction("CrossEntropy"));
+
+    long start = new Date().getTime();
+    int iterations = 1000;
+    for (int i = 0; i < iterations; ++i) {
+      for (double[] trainingInstance : trainingInstances) {
+        ann.trainOnline(new DenseDoubleVector(trainingInstance));
+      }
+    }
+    long end = new Date().getTime();
+    Log.info(String.format("Training time: %fs\n",
+        (double) (end - start) / 1000));
+
+    double errorRate = 0;
+    // calculate the error on test instance
+    for (double[] testInstance : testInstances) {
+      DoubleVector instance = new DenseDoubleVector(testInstance);
+      double expected = instance.get(instance.getDimension() - 1);
+      instance = instance.slice(instance.getDimension() - 1);
+      double actual = ann.getOutput(instance).get(0);
+      if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) {
+        ++errorRate;
+      }
+    }
+    errorRate /= testInstances.size();
+
+    Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
+  }
+  
+  @Test
+  public void testLogisticRegression() {
+    this.testLogisticRegressionDistributedVersion();
+    this.testLogisticRegressionDistributedVersionWithFeatureTransformer();
+  }
+
+  public void testLogisticRegressionDistributedVersion() {
+    // write data into a sequence file
+    String tmpStrDatasetPath = "/tmp/logistic_regression_data";
+    Path tmpDatasetPath = new Path(tmpStrDatasetPath);
+    String strDataPath = "src/test/resources/logistic_regression_data.txt";
+    String modelPath = "/tmp/logistic-regression-distributed-model";
+
+    Configuration conf = new Configuration();
+    List<double[]> instanceList = new ArrayList<double[]>();
+    List<double[]> trainingInstances = null;
+    List<double[]> testInstances = null;
+
+    try {
+      FileSystem fs = FileSystem.get(new URI(tmpStrDatasetPath), conf);
+      fs.delete(tmpDatasetPath, true);
+      if (fs.exists(tmpDatasetPath)) {
+        fs.createNewFile(tmpDatasetPath);
+      }
+
+      BufferedReader br = new BufferedReader(new FileReader(strDataPath));
+      String line = null;
+      int count = 0;
+      while ((line = br.readLine()) != null) {
+        String[] tokens = line.trim().split(",");
+        double[] instance = new double[tokens.length];
+        for (int i = 0; i < tokens.length; ++i) {
+          instance[i] = Double.parseDouble(tokens[i]);
+        }
+        instanceList.add(instance);
+      }
+      br.close();
+      
+      zeroOneNormalization(instanceList, instanceList.get(0).length - 1);
+      
+      // write training data to temporal sequence file
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
+          tmpDatasetPath, LongWritable.class, VectorWritable.class);
+      int testSize = 150;
+
+      Collections.shuffle(instanceList);
+      testInstances = new ArrayList<double[]>();
+      testInstances.addAll(instanceList.subList(instanceList.size() - testSize,
+          instanceList.size()));
+      trainingInstances = instanceList.subList(0, instanceList.size()
+          - testSize);
+
+      for (double[] instance : trainingInstances) {
+        DoubleVector vec = new DenseDoubleVector(instance);
+        writer.append(new LongWritable(count++), new VectorWritable(vec));
+      }
+      writer.close();
+    } catch (FileNotFoundException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }
+
+    // create model
+    int dimension = 8;
+    LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+    // ann.setLearningRate(0.7);
+    // ann.setMomemtumWeight(0.5);
+    //ann.setRegularizationWeight(0.1);
+    ann.addLayer(dimension, false,
+        FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(dimension, false,
+        FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(dimension, false,
+        FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction("CrossEntropy"));
+    ann.setModelPath(modelPath);
+
+    long start = new Date().getTime();
+    Map<String, String> trainingParameters = new HashMap<String, String>();
+    trainingParameters.put("tasks", "5");
+    trainingParameters.put("training.max.iterations", "2000");
+    trainingParameters.put("training.batch.size", "300");
+    trainingParameters.put("convergence.check.interval", "1000");
+    //ann.train(new HamaConfiguration(), tmpDatasetPath, trainingParameters);
+
+    long end = new Date().getTime();
+
+    // validate results
+    double errorRate = 0;
+    // calculate the error on test instance
+    for (double[] testInstance : testInstances) {
+      DoubleVector instance = new DenseDoubleVector(testInstance);
+      double expected = instance.get(instance.getDimension() - 1);
+      instance = instance.slice(instance.getDimension() - 1);
+      double actual = ann.getOutput(instance).get(0);
+      if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) {
+        ++errorRate;
+      }
+    }
+    errorRate /= testInstances.size();
+
+    Log.info(String.format("Training time: %fs\n",
+        (double) (end - start) / 1000));
+    Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
+  }
+
+  public void testLogisticRegressionDistributedVersionWithFeatureTransformer() {
+    // write data into a sequence file
+    String tmpStrDatasetPath = "/tmp/logistic_regression_data_feature_transformer";
+    Path tmpDatasetPath = new Path(tmpStrDatasetPath);
+    String strDataPath = "src/test/resources/logistic_regression_data.txt";
+    String modelPath = "/tmp/logistic-regression-distributed-model-feature-transformer";
+
+    Configuration conf = new Configuration();
+    List<double[]> instanceList = new ArrayList<double[]>();
+    List<double[]> trainingInstances = null;
+    List<double[]> testInstances = null;
+
+    try {
+      FileSystem fs = FileSystem.get(new URI(tmpStrDatasetPath), conf);
+      fs.delete(tmpDatasetPath, true);
+      if (fs.exists(tmpDatasetPath)) {
+        fs.createNewFile(tmpDatasetPath);
+      }
+
+      BufferedReader br = new BufferedReader(new FileReader(strDataPath));
+      String line = null;
+      int count = 0;
+      while ((line = br.readLine()) != null) {
+        String[] tokens = line.trim().split(",");
+        double[] instance = new double[tokens.length];
+        for (int i = 0; i < tokens.length; ++i) {
+          instance[i] = Double.parseDouble(tokens[i]);
+        }
+        instanceList.add(instance);
+      }
+      br.close();
+      
+      zeroOneNormalization(instanceList, instanceList.get(0).length - 1);
+      
+      // write training data to temporal sequence file
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
+          tmpDatasetPath, LongWritable.class, VectorWritable.class);
+      int testSize = 150;
+
+      Collections.shuffle(instanceList);
+      testInstances = new ArrayList<double[]>();
+      testInstances.addAll(instanceList.subList(instanceList.size() - testSize,
+          instanceList.size()));
+      trainingInstances = instanceList.subList(0, instanceList.size()
+          - testSize);
+
+      for (double[] instance : trainingInstances) {
+        DoubleVector vec = new DenseDoubleVector(instance);
+        writer.append(new LongWritable(count++), new VectorWritable(vec));
+      }
+      writer.close();
+    } catch (FileNotFoundException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }
+
+    // create model
+    int dimension = 8;
+    LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+    // ann.setLearningRate(0.7);
+    // ann.setMomemtumWeight(0.5);
+    //ann.setRegularizationWeight(0.1);
+    ann.addLayer(dimension, false,
+        FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(dimension, false,
+        FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(dimension, false,
+        FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+    ann.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction("CrossEntropy"));
+    ann.setModelPath(modelPath);
+    
+    FeatureTransformer featureTransformer = new DefaultFeatureTransformer();
+    
+    ann.setFeatureTransformer(featureTransformer);
+
+    long start = new Date().getTime();
+    Map<String, String> trainingParameters = new HashMap<String, String>();
+    trainingParameters.put("tasks", "5");
+    trainingParameters.put("training.max.iterations", "2000");
+    trainingParameters.put("training.batch.size", "300");
+    trainingParameters.put("convergence.check.interval", "1000");
+    //ann.train(new HamaConfiguration(), tmpDatasetPath, trainingParameters);
+
+    long end = new Date().getTime();
+
+    // validate results
+    double errorRate = 0;
+    // calculate the error on test instance
+    for (double[] testInstance : testInstances) {
+      DoubleVector instance = new DenseDoubleVector(testInstance);
+      double expected = instance.get(instance.getDimension() - 1);
+      instance = instance.slice(instance.getDimension() - 1);
+      instance = featureTransformer.transform(instance);
+      double actual = ann.getOutput(instance).get(0);
+      if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) {
+        ++errorRate;
+      }
+    }
+    errorRate /= testInstances.size();
+
+    Log.info(String.format("Training time: %fs\n",
+        (double) (end - start) / 1000));
+    Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetworkMessage.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetworkMessage.java b/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetworkMessage.java
new file mode 100644
index 0000000..a0c66d2
--- /dev/null
+++ b/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetworkMessage.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.horn.core.ParameterMessage;
+import org.junit.Test;
+
+/**
+ * Test the functionalities of SmallLayeredNeuralNetworkMessage.
+ * 
+ */
+public class TestSmallLayeredNeuralNetworkMessage {
+
+  @Test
+  public void testReadWriteWithoutPrev() {
+    double error = 0.22;
+    double[][] matrix1 = new double[][] { { 0.1, 0.2, 0.8, 0.5 },
+        { 0.3, 0.4, 0.6, 0.2 }, { 0.5, 0.6, 0.1, 0.5 } };
+    double[][] matrix2 = new double[][] { { 0.8, 1.2, 0.5 } };
+    DoubleMatrix[] matrices = new DoubleMatrix[2];
+    matrices[0] = new DenseDoubleMatrix(matrix1);
+    matrices[1] = new DenseDoubleMatrix(matrix2);
+
+    boolean isConverge = false;
+
+    ParameterMessage message = new ParameterMessage(
+        error, isConverge, matrices, null);
+    Configuration conf = new Configuration();
+    String strPath = "/tmp/testReadWriteSmallLayeredNeuralNetworkMessage";
+    Path path = new Path(strPath);
+    try {
+      FileSystem fs = FileSystem.get(new URI(strPath), conf);
+      FSDataOutputStream out = fs.create(path);
+      message.write(out);
+      out.close();
+
+      FSDataInputStream in = fs.open(path);
+      ParameterMessage readMessage = new ParameterMessage(
+          0, isConverge, null, null);
+      readMessage.readFields(in);
+      in.close();
+      assertEquals(error, readMessage.getTrainingError(), 0.000001);
+      assertFalse(readMessage.isConverge());
+      DoubleMatrix[] readMatrices = readMessage.getCurMatrices();
+      assertEquals(2, readMatrices.length);
+      for (int i = 0; i < readMatrices.length; ++i) {
+        double[][] doubleMatrices = ((DenseDoubleMatrix) readMatrices[i])
+            .getValues();
+        double[][] doubleExpected = ((DenseDoubleMatrix) matrices[i])
+            .getValues();
+        for (int r = 0; r < doubleMatrices.length; ++r) {
+          assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001);
+        }
+      }
+
+      DoubleMatrix[] readPrevMatrices = readMessage.getPrevMatrices();
+      assertNull(readPrevMatrices);
+
+      // delete
+      fs.delete(path, true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testReadWriteWithPrev() {
+    double error = 0.22;
+    boolean isConverge = true;
+
+    double[][] matrix1 = new double[][] { { 0.1, 0.2, 0.8, 0.5 },
+        { 0.3, 0.4, 0.6, 0.2 }, { 0.5, 0.6, 0.1, 0.5 } };
+    double[][] matrix2 = new double[][] { { 0.8, 1.2, 0.5 } };
+    DoubleMatrix[] matrices = new DoubleMatrix[2];
+    matrices[0] = new DenseDoubleMatrix(matrix1);
+    matrices[1] = new DenseDoubleMatrix(matrix2);
+
+    double[][] prevMatrix1 = new double[][] { { 0.1, 0.1, 0.2, 0.3 },
+        { 0.2, 0.4, 0.1, 0.5 }, { 0.5, 0.1, 0.5, 0.2 } };
+    double[][] prevMatrix2 = new double[][] { { 0.1, 0.2, 0.5, 0.9 },
+        { 0.3, 0.5, 0.2, 0.6 }, { 0.6, 0.8, 0.7, 0.5 } };
+
+    DoubleMatrix[] prevMatrices = new DoubleMatrix[2];
+    prevMatrices[0] = new DenseDoubleMatrix(prevMatrix1);
+    prevMatrices[1] = new DenseDoubleMatrix(prevMatrix2);
+
+    ParameterMessage message = new ParameterMessage(
+        error, isConverge, matrices, prevMatrices);
+    Configuration conf = new Configuration();
+    String strPath = "/tmp/testReadWriteSmallLayeredNeuralNetworkMessageWithPrev";
+    Path path = new Path(strPath);
+    try {
+      FileSystem fs = FileSystem.get(new URI(strPath), conf);
+      FSDataOutputStream out = fs.create(path);
+      message.write(out);
+      out.close();
+
+      FSDataInputStream in = fs.open(path);
+      ParameterMessage readMessage = new ParameterMessage(
+          0, isConverge, null, null);
+      readMessage.readFields(in);
+      in.close();
+
+      assertTrue(readMessage.isConverge());
+
+      DoubleMatrix[] readMatrices = readMessage.getCurMatrices();
+      assertEquals(2, readMatrices.length);
+      for (int i = 0; i < readMatrices.length; ++i) {
+        double[][] doubleMatrices = ((DenseDoubleMatrix) readMatrices[i])
+            .getValues();
+        double[][] doubleExpected = ((DenseDoubleMatrix) matrices[i])
+            .getValues();
+        for (int r = 0; r < doubleMatrices.length; ++r) {
+          assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001);
+        }
+      }
+
+      DoubleMatrix[] readPrevMatrices = readMessage.getPrevMatrices();
+      assertEquals(2, readPrevMatrices.length);
+      for (int i = 0; i < readPrevMatrices.length; ++i) {
+        double[][] doubleMatrices = ((DenseDoubleMatrix) readPrevMatrices[i])
+            .getValues();
+        double[][] doubleExpected = ((DenseDoubleMatrix) prevMatrices[i])
+            .getValues();
+        for (int r = 0; r < doubleMatrices.length; ++r) {
+          assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001);
+        }
+      }
+
+      // delete
+      fs.delete(path, true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java b/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
new file mode 100644
index 0000000..fd24c4f
--- /dev/null
+++ b/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.examples;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.horn.core.HornJob;
+import org.apache.horn.core.LayeredNeuralNetwork;
+
+/**
+ * Test the functionality of NeuralNetwork Example.
+ */
+public class MultiLayerPerceptronTest extends HamaCluster {
+  private HamaConfiguration conf;
+  private FileSystem fs;
+  private String MODEL_PATH = "/tmp/neuralnets.model";
+  private String RESULT_PATH = "/tmp/neuralnets.txt";
+  private String SEQTRAIN_DATA = "/tmp/test-neuralnets.data";
+
+  public MultiLayerPerceptronTest() {
+    conf = new HamaConfiguration();
+    conf.set("bsp.master.address", "localhost");
+    conf.setBoolean("hama.child.redirect.log.console", true);
+    conf.setBoolean("hama.messenger.runtime.compression", true);
+    assertEquals("Make sure master addr is set to localhost:", "localhost",
+        conf.get("bsp.master.address"));
+    conf.set("bsp.local.dir", "/tmp/hama-test");
+    conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+    conf.set("hama.sync.client.class",
+        org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+            .getCanonicalName());
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(conf);
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  public void testNeuralnetsLabeling() throws IOException {
+    this.neuralNetworkTraining();
+
+    String featureDataPath = "src/test/resources/neuralnets_classification_test.txt";
+    try {
+      LayeredNeuralNetwork ann = new LayeredNeuralNetwork(conf,
+          MODEL_PATH);
+
+      // process data in streaming approach
+      FileSystem fs = FileSystem.get(new URI(featureDataPath), conf);
+      BufferedReader br = new BufferedReader(new InputStreamReader(
+          fs.open(new Path(featureDataPath))));
+
+      String line = null;
+      line = null;
+
+      // compare results with ground-truth
+      BufferedReader groundTruthReader = new BufferedReader(new FileReader(
+          "src/test/resources/neuralnets_classification_label.txt"));
+
+      double correct = 0;
+      int samples = 0;
+      while ((line = br.readLine()) != null) {
+        if (line.trim().length() == 0) {
+          continue;
+        }
+        String[] tokens = line.trim().split(",");
+        double[] vals = new double[tokens.length];
+        for (int i = 0; i < tokens.length; ++i) {
+          vals[i] = Double.parseDouble(tokens[i]);
+        }
+        DoubleVector instance = new DenseDoubleVector(vals);
+        DoubleVector result = ann.getOutput(instance);
+        double actual = result.toArray()[0];
+        double expected = Double.parseDouble(groundTruthReader.readLine());
+
+        LOG.info("evaluated: " + actual + ", expected: " + expected);
+        if (actual < 0.5 && expected < 0.5 || actual >= 0.5 && expected >= 0.5) {
+          ++correct;
+        }
+        samples++;
+      }
+
+      groundTruthReader.close();
+      br.close();
+
+      LOG.info("## Precision: " + (correct / samples));
+      assertTrue((correct / samples) > 0.5);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      fs.delete(new Path(RESULT_PATH), true);
+      fs.delete(new Path(MODEL_PATH), true);
+      fs.delete(new Path(SEQTRAIN_DATA), true);
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  private void neuralNetworkTraining() {
+    String strTrainingDataPath = "src/test/resources/neuralnets_classification_training.txt";
+    int featureDimension = 8;
+    int labelDimension = 1;
+
+    Path sequenceTrainingDataPath = new Path(SEQTRAIN_DATA);
+    try {
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
+          sequenceTrainingDataPath, LongWritable.class, VectorWritable.class);
+      BufferedReader br = new BufferedReader(
+          new FileReader(strTrainingDataPath));
+      String line = null;
+      // convert the data in sequence file format
+      while ((line = br.readLine()) != null) {
+        String[] tokens = line.split(",");
+        double[] vals = new double[tokens.length];
+        for (int i = 0; i < tokens.length; ++i) {
+          vals[i] = Double.parseDouble(tokens[i]);
+        }
+        writer.append(new LongWritable(), new VectorWritable(
+            new DenseDoubleVector(vals)));
+      }
+      writer.close();
+      br.close();
+    } catch (IOException e1) {
+      e1.printStackTrace();
+    }
+
+    try {
+      HornJob ann = MultiLayerPerceptron.createJob(conf, MODEL_PATH,
+          SEQTRAIN_DATA, 0.4, 0.2, 0.01, featureDimension, labelDimension,
+          1000, 2);
+
+      long startTime = System.currentTimeMillis();
+      if (ann.waitForCompletion(true)) {
+        LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime)
+            / 1000.0 + " seconds");
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/examples/NeuralNetworkTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/examples/NeuralNetworkTest.java b/src/test/java/org/apache/horn/examples/NeuralNetworkTest.java
deleted file mode 100644
index 932b17a..0000000
--- a/src/test/java/org/apache/horn/examples/NeuralNetworkTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.examples;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hama.Constants;
-import org.apache.hama.HamaCluster;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.horn.bsp.HornJob;
-import org.apache.horn.bsp.SmallLayeredNeuralNetwork;
-
-/**
- * Test the functionality of NeuralNetwork Example.
- * 
- */
-public class NeuralNetworkTest extends HamaCluster {
-  private HamaConfiguration conf;
-  private FileSystem fs;
-  private String MODEL_PATH = "/tmp/neuralnets.model";
-  private String RESULT_PATH = "/tmp/neuralnets.txt";
-  private String SEQTRAIN_DATA = "/tmp/test-neuralnets.data";
-
-  public NeuralNetworkTest() {
-    conf = new HamaConfiguration();
-    conf.set("bsp.master.address", "localhost");
-    conf.setBoolean("hama.child.redirect.log.console", true);
-    conf.setBoolean("hama.messenger.runtime.compression", true);
-    assertEquals("Make sure master addr is set to localhost:", "localhost",
-        conf.get("bsp.master.address"));
-    conf.set("bsp.local.dir", "/tmp/hama-test");
-    conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
-    conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
-    conf.set("hama.sync.client.class",
-        org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
-            .getCanonicalName());
-  }
-
-  @Override
-  protected void setUp() throws Exception {
-    super.setUp();
-    fs = FileSystem.get(conf);
-  }
-
-  @Override
-  public void tearDown() throws Exception {
-    super.tearDown();
-  }
-
-  public void testNeuralnetsLabeling() throws IOException {
-    this.neuralNetworkTraining();
-
-    String featureDataPath = "src/test/resources/neuralnets_classification_test.txt";
-    try {
-      SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork(conf,
-          MODEL_PATH);
-
-      // process data in streaming approach
-      FileSystem fs = FileSystem.get(new URI(featureDataPath), conf);
-      BufferedReader br = new BufferedReader(new InputStreamReader(
-          fs.open(new Path(featureDataPath))));
-      Path outputPath = new Path(RESULT_PATH);
-      if (fs.exists(outputPath)) {
-        fs.delete(outputPath, true);
-      }
-      BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(
-          fs.create(outputPath)));
-
-      String line = null;
-
-      while ((line = br.readLine()) != null) {
-        if (line.trim().length() == 0) {
-          continue;
-        }
-        String[] tokens = line.trim().split(",");
-        double[] vals = new double[tokens.length];
-        for (int i = 0; i < tokens.length; ++i) {
-          vals[i] = Double.parseDouble(tokens[i]);
-        }
-        DoubleVector instance = new DenseDoubleVector(vals);
-        DoubleVector result = ann.getOutput(instance);
-        double[] arrResult = result.toArray();
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < arrResult.length; ++i) {
-          sb.append(arrResult[i]);
-          if (i != arrResult.length - 1) {
-            sb.append(",");
-          } else {
-            sb.append("\n");
-          }
-        }
-        bw.write(sb.toString());
-      }
-
-      br.close();
-      bw.close();
-
-      // compare results with ground-truth
-      BufferedReader groundTruthReader = new BufferedReader(new FileReader(
-          "src/test/resources/neuralnets_classification_label.txt"));
-      List<Double> groundTruthList = new ArrayList<Double>();
-      line = null;
-      while ((line = groundTruthReader.readLine()) != null) {
-        groundTruthList.add(Double.parseDouble(line));
-      }
-      groundTruthReader.close();
-
-      BufferedReader resultReader = new BufferedReader(new FileReader(
-          RESULT_PATH));
-      List<Double> resultList = new ArrayList<Double>();
-      while ((line = resultReader.readLine()) != null) {
-        resultList.add(Double.parseDouble(line));
-      }
-      resultReader.close();
-      int total = resultList.size();
-      double correct = 0;
-      for (int i = 0; i < groundTruthList.size(); ++i) {
-        double actual = resultList.get(i);
-        double expected = groundTruthList.get(i);
-        LOG.info("evaluated: " + actual + ", expected: " + expected);
-        if (actual < 0.5 && expected < 0.5 || actual >= 0.5 && expected >= 0.5) {
-          ++correct;
-        }
-      }
-
-      LOG.info("## Precision: " + (correct / total));
-      assertTrue((correct / total) > 0.5);
-
-    } catch (Exception e) {
-      e.printStackTrace();
-    } finally {
-      fs.delete(new Path(RESULT_PATH), true);
-      fs.delete(new Path(MODEL_PATH), true);
-      fs.delete(new Path(SEQTRAIN_DATA), true);
-    }
-  }
-
-  @SuppressWarnings("deprecation")
-  private void neuralNetworkTraining() {
-    String strTrainingDataPath = "src/test/resources/neuralnets_classification_training.txt";
-    int featureDimension = 8;
-    int labelDimension = 1;
-
-    Path sequenceTrainingDataPath = new Path(SEQTRAIN_DATA);
-    try {
-      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
-          sequenceTrainingDataPath, LongWritable.class, VectorWritable.class);
-      BufferedReader br = new BufferedReader(
-          new FileReader(strTrainingDataPath));
-      String line = null;
-      // convert the data in sequence file format
-      while ((line = br.readLine()) != null) {
-        String[] tokens = line.split(",");
-        double[] vals = new double[tokens.length];
-        for (int i = 0; i < tokens.length; ++i) {
-          vals[i] = Double.parseDouble(tokens[i]);
-        }
-        writer.append(new LongWritable(), new VectorWritable(
-            new DenseDoubleVector(vals)));
-      }
-      writer.close();
-      br.close();
-    } catch (IOException e1) {
-      e1.printStackTrace();
-    }
-
-    try {
-      HornJob ann = MultiLayerPerceptron.createJob(conf, MODEL_PATH,
-          SEQTRAIN_DATA, 0.4, 0.2, 0.01, featureDimension, labelDimension,
-          1000, 2);
-
-      long startTime = System.currentTimeMillis();
-      if (ann.waitForCompletion(true)) {
-        LOG.info("Job Finished in "
-            + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
-      }
-      
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/trainer/TestNeuron.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/trainer/TestNeuron.java b/src/test/java/org/apache/horn/trainer/TestNeuron.java
deleted file mode 100644
index b5f6bfc..0000000
--- a/src/test/java/org/apache/horn/trainer/TestNeuron.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.trainer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hama.HamaConfiguration;
-import org.apache.horn.bsp.Neuron;
-import org.apache.horn.bsp.Synapse;
-import org.apache.horn.funcs.Sigmoid;
-
-public class TestNeuron extends TestCase {
-  private static double learningRate = 0.1;
-  private static double bias = -1;
-  private static double theta = 0.8;
-
-  public static class MyNeuron extends
-      Neuron<Synapse<DoubleWritable, DoubleWritable>> {
-
-    @Override
-    public void setup(HamaConfiguration conf) {
-    }
-
-    @Override
-    public void forward(
-        Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
-        throws IOException {
-      double sum = 0;
-      for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
-        sum += m.getInput() * m.getWeight();
-      }
-      sum += (bias * theta);
-      this.feedforward(new Sigmoid().apply(sum));
-    }
-
-    @Override
-    public void backward(
-        Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
-        throws IOException {
-      for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
-        // Calculates error gradient for each neuron
-        double gradient = new Sigmoid().applyDerivative(this.getOutput()) * (m.getDelta() * m.getWeight());
-
-        // Propagates to lower layer
-        backpropagate(gradient);
-
-        // Weight corrections
-        double weight = learningRate * this.getOutput() * m.getDelta();
-        this.push(weight);
-      }
-    }
-
-  }
-
-  public void testProp() throws IOException {
-    List<Synapse<DoubleWritable, DoubleWritable>> x = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
-    x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
-        1.0), new DoubleWritable(0.5)));
-    x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
-        1.0), new DoubleWritable(0.4)));
-
-    MyNeuron n = new MyNeuron();
-    n.forward(x);
-    assertEquals(0.5249791874789399, n.getOutput());
-
-    x.clear();
-    x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
-        -0.1274), new DoubleWritable(-1.2)));
-    n.backward(x);
-    assertEquals(-0.006688234848481696, n.getUpdate());
-  }
-
-}


[3/4] incubator-horn git commit: Code refactoring

Posted by ed...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
new file mode 100644
index 0000000..f87e771
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.math.DoubleDoubleFunction;
+import org.apache.hama.commons.math.DoubleFunction;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.horn.funcs.FunctionFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * AbstractLayeredNeuralNetwork defines the general operations for derivative
+ * layered models, include Linear Regression, Logistic Regression, Multilayer
+ * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc.
+ * 
+ * In general, these models consist of neurons which are aligned in layers.
+ * Between layers, for any two adjacent layers, the neurons are connected to
+ * form a bipartite weighted graph.
+ * 
+ */
+abstract class AbstractLayeredNeuralNetwork extends AbstractNeuralNetwork {
+
+  private static final double DEFAULT_MOMENTUM_WEIGHT = 0.1;
+
+  double trainingError;
+
+  /* The momentumWeight */
+  protected double momentumWeight;
+
+  /* The cost function of the model */
+  protected DoubleDoubleFunction costFunction;
+
+  /* Record the size of each layer */
+  protected List<Integer> layerSizeList;
+
+  protected TrainingMethod trainingMethod;
+  
+  protected LearningStyle learningStyle;
+
+  public static enum TrainingMethod {
+    GRADIENT_DESCENT
+  }
+  
+  public static enum LearningStyle {
+    UNSUPERVISED,
+    SUPERVISED
+  }
+  
+  public AbstractLayeredNeuralNetwork() {
+    this.momentumWeight = DEFAULT_MOMENTUM_WEIGHT;
+    this.trainingMethod = TrainingMethod.GRADIENT_DESCENT;
+    this.learningStyle = LearningStyle.SUPERVISED;
+  }
+
+  public AbstractLayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
+    super(conf, modelPath);
+  }
+
+  public void setTrainingMethod(TrainingMethod method) {
+    this.trainingMethod = method;
+  }
+
+  public TrainingMethod getTrainingMethod() {
+    return this.trainingMethod;
+  }
+  
+  public void setLearningStyle(LearningStyle style) {
+    this.learningStyle = style;
+  }
+  
+  public LearningStyle getLearningStyle() {
+    return this.learningStyle;
+  }
+
+  /**
+   * Set the cost function for the model.
+   * 
+   * @param costFunction
+   */
+  public void setCostFunction(DoubleDoubleFunction costFunction) {
+    this.costFunction = costFunction;
+  }
+
+  /**
+   * Add a layer of neurons with specified size. If the added layer is not the
+   * first layer, it will automatically connects the neurons between with the
+   * previous layer.
+   * 
+   * @param size
+   * @param isFinalLayer If false, add a bias neuron.
+   * @param squashingFunction The squashing function for this layer, input layer
+   *          is f(x) = x by default.
+   * @return The layer index, starts with 0.
+   */
+  public abstract int addLayer(int size, boolean isFinalLayer,
+      DoubleFunction squashingFunction);
+
+  /**
+   * Get the size of a particular layer.
+   * 
+   * @param layer
+   * @return The layer size.
+   */
+  public int getLayerSize(int layer) {
+    Preconditions.checkArgument(
+        layer >= 0 && layer < this.layerSizeList.size(),
+        String.format("Input must be in range [0, %d]\n",
+            this.layerSizeList.size() - 1));
+    return this.layerSizeList.get(layer);
+  }
+
+  /**
+   * Get the layer size list.
+   * 
+   * @return The layer size list.
+   */
+  protected List<Integer> getLayerSizeList() {
+    return this.layerSizeList;
+  }
+
+  /**
+   * Get the weights between layer layerIdx and layerIdx + 1
+   * 
+   * @param layerIdx The index of the layer
+   * @return The weights in form of {@link DoubleMatrix}
+   */
+  public abstract DoubleMatrix getWeightsByLayer(int layerIdx);
+
+  /**
+   * Get the updated weights using one training instance.
+   * 
+   * @param trainingInstance The trainingInstance is the concatenation of
+   *          feature vector and class label vector.
+   * @return The update of each weight, in form of matrix list.
+   * @throws Exception
+   */
+  public abstract DoubleMatrix[] trainByInstance(DoubleVector trainingInstance);
+
+  /**
+   * Get the output calculated by the model.
+   * 
+   * @param instance The feature instance.
+   * @return a new vector with the result of the operation.
+   */
+  public abstract DoubleVector getOutput(DoubleVector instance);
+
+  /**
+   * Calculate the training error based on the labels and outputs.
+   * 
+   * @param labels
+   * @param output
+   */
+  protected abstract void calculateTrainingError(DoubleVector labels,
+      DoubleVector output);
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    super.readFields(input);
+    // read momentum weight
+    this.momentumWeight = input.readDouble();
+
+    // read cost function
+    this.costFunction = FunctionFactory
+        .createDoubleDoubleFunction(WritableUtils.readString(input));
+
+    // read layer size list
+    int numLayers = input.readInt();
+    this.layerSizeList = Lists.newArrayList();
+    for (int i = 0; i < numLayers; ++i) {
+      this.layerSizeList.add(input.readInt());
+    }
+
+    this.trainingMethod = WritableUtils.readEnum(input, TrainingMethod.class);
+    this.learningStyle = WritableUtils.readEnum(input, LearningStyle.class);
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    super.write(output);
+    // write momentum weight
+    output.writeDouble(this.momentumWeight);
+
+    // write cost function
+    WritableUtils.writeString(output, costFunction.getFunctionName());
+
+    // write layer size list
+    output.writeInt(this.layerSizeList.size());
+    for (Integer aLayerSizeList : this.layerSizeList) {
+      output.writeInt(aLayerSizeList);
+    }
+
+    WritableUtils.writeEnum(output, this.trainingMethod);
+    WritableUtils.writeEnum(output, this.learningStyle);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java b/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
new file mode 100644
index 0000000..45f56a3
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.ml.util.DefaultFeatureTransformer;
+import org.apache.hama.ml.util.FeatureTransformer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+
+/**
+ * NeuralNetwork defines the general operations for all the derivative models.
+ * Typically, all derivative models such as Linear Regression, Logistic
+ * Regression, and Multilayer Perceptron consist of neurons and the weights
+ * between neurons.
+ * 
+ */
+public abstract class AbstractNeuralNetwork implements Writable {
+  protected HamaConfiguration conf;
+  protected FileSystem fs;
+
+  private static final double DEFAULT_LEARNING_RATE = 0.5;
+
+  protected double learningRate;
+  protected boolean learningRateDecay = false;
+
+  // the name of the model
+  protected String modelType;
+  // the path to store the model
+  protected String modelPath;
+
+  protected FeatureTransformer featureTransformer;
+
+  public AbstractNeuralNetwork() {
+    this.learningRate = DEFAULT_LEARNING_RATE;
+    this.modelType = this.getClass().getSimpleName();
+    this.featureTransformer = new DefaultFeatureTransformer();
+  }
+
+  public AbstractNeuralNetwork(String modelPath) {
+    this.modelPath = modelPath;
+  }
+
+  public AbstractNeuralNetwork(HamaConfiguration conf, String modelPath) {
+    try {
+      this.conf = conf;
+      this.fs = FileSystem.get(conf);
+      this.modelPath = modelPath;
+
+      this.readFromModel();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  public void isLearningRateDecay(boolean decay) {
+    this.learningRateDecay = decay;
+  }
+
+  public String getModelType() {
+    return this.modelType;
+  }
+
+  /**
+   * Train the model with the path of given training data and parameters.
+   * 
+   * @param dataInputPath The path of the training data.
+   * @param trainingParams The parameters for training.
+   * @throws InterruptedException 
+   * @throws ClassNotFoundException 
+   * @throws IOException
+   */
+  public BSPJob train(Configuration conf) throws ClassNotFoundException, IOException, InterruptedException {
+    Preconditions.checkArgument(this.modelPath != null,
+        "Please set the model path before training.");
+
+    // train with BSP job
+    return trainInternal((HamaConfiguration) conf);
+  }
+
+  /**
+   * Train the model with the path of given training data and parameters.
+   */
+  protected abstract BSPJob trainInternal(HamaConfiguration hamaConf)
+      throws IOException, InterruptedException, ClassNotFoundException;
+
+  /**
+   * Read the model meta-data from the specified location.
+   * 
+   * @throws IOException
+   */
+  protected void readFromModel() throws IOException {
+    Preconditions.checkArgument(this.modelPath != null,
+        "Model path has not been set.");
+    FSDataInputStream is = new FSDataInputStream(fs.open(new Path(modelPath)));
+    this.readFields(is);
+    Closeables.close(is, false);
+  }
+
+  /**
+   * Write the model data to specified location.
+   * 
+   * @throws IOException
+   */
+  public void writeModelToFile() throws IOException {
+    Preconditions.checkArgument(this.modelPath != null,
+        "Model path has not been set.");
+
+    FSDataOutputStream is = fs.create(new Path(this.modelPath), true);
+    this.write(is);
+
+    Closeables.close(is, false);
+  }
+
+  /**
+   * Set the model path.
+   * 
+   * @param modelPath
+   */
+  public void setModelPath(String modelPath) {
+    this.modelPath = modelPath;
+  }
+
+  /**
+   * Get the model path.
+   * 
+   * @return the path to store the model.
+   */
+  public String getModelPath() {
+    return this.modelPath;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    // read model type
+    this.modelType = WritableUtils.readString(input);
+    // read learning rate
+    this.learningRate = input.readDouble();
+    // read model path
+    this.modelPath = WritableUtils.readString(input);
+
+    if (this.modelPath.equals("null")) {
+      this.modelPath = null;
+    }
+
+    // read feature transformer
+    int bytesLen = input.readInt();
+    byte[] featureTransformerBytes = new byte[bytesLen];
+    for (int i = 0; i < featureTransformerBytes.length; ++i) {
+      featureTransformerBytes[i] = input.readByte();
+    }
+
+    Class<? extends FeatureTransformer> featureTransformerCls = (Class<? extends FeatureTransformer>) SerializationUtils
+        .deserialize(featureTransformerBytes);
+
+    Constructor[] constructors = featureTransformerCls
+        .getDeclaredConstructors();
+    Constructor constructor = constructors[0];
+
+    try {
+      this.featureTransformer = (FeatureTransformer) constructor
+          .newInstance(new Object[] {});
+    } catch (InstantiationException e) {
+      e.printStackTrace();
+    } catch (IllegalAccessException e) {
+      e.printStackTrace();
+    } catch (IllegalArgumentException e) {
+      e.printStackTrace();
+    } catch (InvocationTargetException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    // write model type
+    WritableUtils.writeString(output, modelType);
+    // write learning rate
+    output.writeDouble(learningRate);
+    // write model path
+    if (this.modelPath != null) {
+      WritableUtils.writeString(output, modelPath);
+    } else {
+      WritableUtils.writeString(output, "null");
+    }
+
+    // serialize the class
+    Class<? extends FeatureTransformer> featureTransformerCls = this.featureTransformer
+        .getClass();
+    byte[] featureTransformerBytes = SerializationUtils
+        .serialize(featureTransformerCls);
+    output.writeInt(featureTransformerBytes.length);
+    output.write(featureTransformerBytes);
+  }
+
+  public void setFeatureTransformer(FeatureTransformer featureTransformer) {
+    this.featureTransformer = featureTransformer;
+  }
+
+  public FeatureTransformer getFeatureTransformer() {
+    return this.featureTransformer;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/AbstractNeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/AbstractNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/core/AbstractNeuralNetworkTrainer.java
new file mode 100644
index 0000000..3547a1a
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/AbstractNeuralNetworkTrainer.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.ml.util.DefaultFeatureTransformer;
+import org.apache.hama.ml.util.FeatureTransformer;
+
+/**
+ * The trainer that is used to train the {@link LayeredNeuralNetwork} with
+ * BSP. The trainer would read the training data and obtain the trained
+ * parameters of the model.
+ * 
+ */
+public abstract class AbstractNeuralNetworkTrainer
+    extends
+    BSP<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse<DoubleWritable, DoubleWritable>> {
+
+  protected static final Log LOG = LogFactory
+      .getLog(AbstractNeuralNetworkTrainer.class);
+
+  protected Configuration conf;
+  protected int maxIteration;
+  protected int batchSize;
+  protected String trainingMode;
+
+  protected FeatureTransformer featureTransformer;
+
+  @Override
+  final public void setup(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse<DoubleWritable, DoubleWritable>> peer)
+      throws IOException, SyncException, InterruptedException {
+    conf = peer.getConfiguration();
+    featureTransformer = new DefaultFeatureTransformer();
+    this.extraSetup(peer);
+  }
+
+  /**
+   * Handle extra setup for sub-classes.
+   * 
+   * @param peer
+   * @throws IOException
+   * @throws SyncException
+   * @throws InterruptedException
+   */
+  protected void extraSetup(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse<DoubleWritable, DoubleWritable>> peer)
+      throws IOException, SyncException, InterruptedException {
+
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public abstract void bsp(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse<DoubleWritable, DoubleWritable>> peer)
+      throws IOException, SyncException, InterruptedException;
+
+  @Override
+  public void cleanup(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse<DoubleWritable, DoubleWritable>> peer)
+      throws IOException {
+    this.extraCleanup(peer);
+    // write model to modelPath
+  }
+
+  /**
+   * Handle cleanup for sub-classes. Write the trained model back.
+   * 
+   * @param peer
+   * @throws IOException
+   * @throws SyncException
+   * @throws InterruptedException
+   */
+  protected void extraCleanup(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse<DoubleWritable, DoubleWritable>> peer)
+      throws IOException {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/AutoEncoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/AutoEncoder.java b/src/main/java/org/apache/horn/core/AutoEncoder.java
new file mode 100644
index 0000000..f638245
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/AutoEncoder.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleFunction;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.hama.ml.util.FeatureTransformer;
+import org.apache.horn.funcs.FunctionFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * AutoEncoder is a model used for dimensional reduction and feature learning.
+ * It is a special kind of {@link AbstractNeuralNetwork} that consists of three layers
+ * of neurons, where the first layer and third layer contains the same number of
+ * neurons.
+ * 
+ */
+public class AutoEncoder {
+
+  private final LayeredNeuralNetwork model;
+
+  /**
+   * Initialize the autoencoder.
+   * 
+   * @param inputDimensions The number of dimensions for the input feature.
+   * @param compressedDimensions The number of dimensions for the compressed
+   *          information.
+   */
+  public AutoEncoder(int inputDimensions, int compressedDimensions) {
+    model = new LayeredNeuralNetwork();
+    model.addLayer(inputDimensions, false,
+        FunctionFactory.createDoubleFunction("Sigmoid"));
+    model.addLayer(compressedDimensions, false,
+        FunctionFactory.createDoubleFunction("Sigmoid"));
+    model.addLayer(inputDimensions, true,
+        FunctionFactory.createDoubleFunction("Sigmoid"));
+    model
+        .setLearningStyle(AbstractLayeredNeuralNetwork.LearningStyle.UNSUPERVISED);
+    model.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction("SquaredError"));
+  }
+
+  public AutoEncoder(HamaConfiguration conf, String modelPath) {
+    model = new LayeredNeuralNetwork(conf, modelPath);
+  }
+
+  public AutoEncoder setModelPath(String modelPath) {
+    model.setModelPath(modelPath);
+    return this;
+  }
+
+  /**
+   * Train the autoencoder with given data. Note that the training data is
+   * pre-processed, where the features
+   * 
+   * @param dataInputPath
+   * @param trainingParams
+   * @throws InterruptedException 
+   * @throws IOException 
+   * @throws ClassNotFoundException 
+   */
+  public BSPJob train(HamaConfiguration conf, Path dataInputPath,
+      Map<String, String> trainingParams) throws ClassNotFoundException, IOException, InterruptedException {
+    return model.train(conf);
+  }
+
+  /**
+   * Train the model with one instance.
+   * 
+   * @param trainingInstance
+   */
+  public void trainOnline(DoubleVector trainingInstance) {
+    model.trainOnline(trainingInstance);
+  }
+
+  /**
+   * Get the matrix M used to encode the input features.
+   * 
+   * @return this matrix with encode the input.
+   */
+  public DoubleMatrix getEncodeWeightMatrix() {
+    return model.getWeightsByLayer(0);
+  }
+
+  /**
+   * Get the matrix M used to decode the compressed information.
+   * 
+   * @return this matrix with decode the compressed information.
+   */
+  public DoubleMatrix getDecodeWeightMatrix() {
+    return model.getWeightsByLayer(1);
+  }
+
+  /**
+   * Transform the input features.
+   * 
+   * @param inputInstance
+   * @return The compressed information.
+   */
+  private DoubleVector transform(DoubleVector inputInstance, int inputLayer) {
+    DoubleVector internalInstance = new DenseDoubleVector(
+        inputInstance.getDimension() + 1);
+    internalInstance.set(0, 1);
+    for (int i = 0; i < inputInstance.getDimension(); ++i) {
+      internalInstance.set(i + 1, inputInstance.get(i));
+    }
+    DoubleFunction squashingFunction = model.getSquashingFunction(inputLayer);
+    DoubleMatrix weightMatrix = null;
+    if (inputLayer == 0) {
+      weightMatrix = this.getEncodeWeightMatrix();
+    } else {
+      weightMatrix = this.getDecodeWeightMatrix();
+    }
+    DoubleVector vec = weightMatrix.multiplyVectorUnsafe(internalInstance);
+    vec = vec.applyToElements(squashingFunction);
+    return vec;
+  }
+
+  /**
+   * Encode the input instance.
+   * 
+   * @param inputInstance
+   * @return a new vector with the encode input instance.
+   */
+  public DoubleVector encode(DoubleVector inputInstance) {
+    Preconditions
+        .checkArgument(
+            inputInstance.getDimension() == model.getLayerSize(0) - 1,
+            String
+                .format(
+                    "The dimension of input instance is %d, but the model requires dimension %d.",
+                    inputInstance.getDimension(), model.getLayerSize(1) - 1));
+    return this.transform(inputInstance, 0);
+  }
+
+  /**
+   * Decode the input instance.
+   * 
+   * @param inputInstance
+   * @return a new vector with the decode input instance.
+   */
+  public DoubleVector decode(DoubleVector inputInstance) {
+    Preconditions
+        .checkArgument(
+            inputInstance.getDimension() == model.getLayerSize(1) - 1,
+            String
+                .format(
+                    "The dimension of input instance is %d, but the model requires dimension %d.",
+                    inputInstance.getDimension(), model.getLayerSize(1) - 1));
+    return this.transform(inputInstance, 1);
+  }
+
+  /**
+   * Get the label(s) according to the given features.
+   * 
+   * @param inputInstance
+   * @return a new vector with output of the model according to given feature
+   *         instance.
+   */
+  public DoubleVector getOutput(DoubleVector inputInstance) {
+    return model.getOutput(inputInstance);
+  }
+
+  /**
+   * Set the feature transformer.
+   * 
+   * @param featureTransformer
+   */
+  public void setFeatureTransformer(FeatureTransformer featureTransformer) {
+    this.model.setFeatureTransformer(featureTransformer);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/HornJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/HornJob.java b/src/main/java/org/apache/horn/core/HornJob.java
new file mode 100644
index 0000000..82dcad8
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/HornJob.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.commons.math.Function;
+import org.apache.horn.funcs.FunctionFactory;
+
+public class HornJob extends BSPJob {
+
+  LayeredNeuralNetwork neuralNetwork;
+
+  public HornJob(HamaConfiguration conf, Class<?> exampleClass)
+      throws IOException {
+    super(conf);
+    this.setJarByClass(exampleClass);
+
+    neuralNetwork = new LayeredNeuralNetwork();
+  }
+
+  public void inputLayer(int featureDimension, Class<? extends Function> func) {
+    addLayer(featureDimension, func);
+  }
+  
+  public void addLayer(int featureDimension, Class<? extends Function> func) {
+    neuralNetwork.addLayer(featureDimension, false,
+        FunctionFactory.createDoubleFunction(func.getSimpleName()));
+  }
+
+  public void outputLayer(int labels, Class<? extends Function> func) {
+    neuralNetwork.addLayer(labels, true,
+        FunctionFactory.createDoubleFunction(func.getSimpleName()));
+  }
+
+  public void setCostFunction(Class<? extends Function> func) {
+    neuralNetwork.setCostFunction(FunctionFactory
+        .createDoubleDoubleFunction(func.getSimpleName()));
+  }
+
+  public void setDouble(String name, double value) {
+    conf.setDouble(name, value);
+  }
+
+  public void setMaxIteration(int maxIteration) {
+    this.conf.setInt("training.max.iterations", maxIteration);
+  }
+
+  public void setBatchSize(int batchSize) {
+    this.conf.setInt("training.batch.size", batchSize);
+  }
+
+  public void setLearningRate(double learningRate) {
+    this.conf.setDouble("mlp.learning.rate", learningRate);
+  }
+
+  public void setConvergenceCheckInterval(int n) {
+    this.conf.setInt("convergence.check.interval", n);
+  }
+
+  public void setMomentumWeight(double momentumWeight) {
+    this.conf.setDouble("mlp.momentum.weight", momentumWeight);
+  }
+
+  public LayeredNeuralNetwork getNeuralNetwork() {
+    return neuralNetwork;
+  }
+
+  public boolean waitForCompletion(boolean verbose) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    BSPJob job = neuralNetwork.train(this.conf);
+    if (verbose) {
+      return job.waitForCompletion(true);
+    } else {
+      return job.waitForCompletion(false);
+    }
+  }
+
+  public void setRegularizationWeight(double regularizationWeight) {
+    this.conf.setDouble("regularization.weight", regularizationWeight);
+  }
+
+  public void setModelPath(String modelPath) {
+    this.conf.set("model.path", modelPath);
+    neuralNetwork.setModelPath(modelPath);
+  }
+
+  public void setTrainingSetPath(String inputPath) {
+    this.conf.set("training.input.path", inputPath);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java b/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
new file mode 100644
index 0000000..afccbff
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.commons.io.MatrixWritable;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleFunction;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.hama.util.ReflectionUtils;
+import org.apache.horn.examples.MultiLayerPerceptron.StandardNeuron;
+import org.apache.horn.funcs.FunctionFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * SmallLayeredNeuralNetwork defines the general operations for derivative
+ * layered models, include Linear Regression, Logistic Regression, Multilayer
+ * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc. For
+ * SmallLayeredNeuralNetwork, the training can be conducted in parallel, but the
+ * parameters of the models are assumes to be stored in a single machine.
+ * 
+ * In general, these models consist of neurons which are aligned in layers.
+ * Between layers, for any two adjacent layers, the neurons are connected to
+ * form a bipartite weighted graph.
+ * 
+ */
+public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
+
+  private static final Log LOG = LogFactory
+      .getLog(LayeredNeuralNetwork.class);
+
+  public static Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>> neuronClass;
+
+  /* Weights between neurons at adjacent layers */
+  protected List<DoubleMatrix> weightMatrixList;
+
+  /* Previous weight updates between neurons at adjacent layers */
+  protected List<DoubleMatrix> prevWeightUpdatesList;
+
+  /* Different layers can have different squashing function */
+  protected List<DoubleFunction> squashingFunctionList;
+
+  protected int finalLayerIdx;
+
+  protected double regularizationWeight;
+
+  public LayeredNeuralNetwork() {
+    this.layerSizeList = Lists.newArrayList();
+    this.weightMatrixList = Lists.newArrayList();
+    this.prevWeightUpdatesList = Lists.newArrayList();
+    this.squashingFunctionList = Lists.newArrayList();
+  }
+
+  public LayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
+    super(conf, modelPath);
+    this.regularizationWeight = conf.getDouble("regularization.weight", 0);
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public int addLayer(int size, boolean isFinalLayer,
+      DoubleFunction squashingFunction) {
+    Preconditions.checkArgument(size > 0,
+        "Size of layer must be larger than 0.");
+    if (!isFinalLayer) {
+      size += 1;
+    }
+
+    LOG.info("Add Layer: " + size);
+    this.layerSizeList.add(size);
+    int layerIdx = this.layerSizeList.size() - 1;
+    if (isFinalLayer) {
+      this.finalLayerIdx = layerIdx;
+    }
+
+    // add weights between current layer and previous layer, and input layer has
+    // no squashing function
+    if (layerIdx > 0) {
+      int sizePrevLayer = this.layerSizeList.get(layerIdx - 1);
+      // row count equals to size of current size and column count equals to
+      // size of previous layer
+      int row = isFinalLayer ? size : size - 1;
+      int col = sizePrevLayer;
+      DoubleMatrix weightMatrix = new DenseDoubleMatrix(row, col);
+      // initialize weights
+      weightMatrix.applyToElements(new DoubleFunction() {
+        @Override
+        public double apply(double value) {
+          return RandomUtils.nextDouble() - 0.5;
+        }
+
+        @Override
+        public double applyDerivative(double value) {
+          throw new UnsupportedOperationException("");
+        }
+      });
+      this.weightMatrixList.add(weightMatrix);
+      this.prevWeightUpdatesList.add(new DenseDoubleMatrix(row, col));
+      this.squashingFunctionList.add(squashingFunction);
+    }
+    return layerIdx;
+  }
+
+  /**
+   * Update the weight matrices with given matrices.
+   * 
+   * @param matrices
+   */
+  public void updateWeightMatrices(DoubleMatrix[] matrices) {
+    for (int i = 0; i < matrices.length; ++i) {
+      DoubleMatrix matrix = this.weightMatrixList.get(i);
+      this.weightMatrixList.set(i, matrix.add(matrices[i]));
+    }
+  }
+
+  /**
+   * Set the previous weight matrices.
+   * 
+   * @param prevUpdates
+   */
+  void setPrevWeightMatrices(DoubleMatrix[] prevUpdates) {
+    this.prevWeightUpdatesList.clear();
+    Collections.addAll(this.prevWeightUpdatesList, prevUpdates);
+  }
+
+  /**
+   * Add a batch of matrices onto the given destination matrices.
+   * 
+   * @param destMatrices
+   * @param sourceMatrices
+   */
+  static void matricesAdd(DoubleMatrix[] destMatrices,
+      DoubleMatrix[] sourceMatrices) {
+    for (int i = 0; i < destMatrices.length; ++i) {
+      destMatrices[i] = destMatrices[i].add(sourceMatrices[i]);
+    }
+  }
+
+  /**
+   * Get all the weight matrices.
+   * 
+   * @return The matrices in form of matrix array.
+   */
+  DoubleMatrix[] getWeightMatrices() {
+    DoubleMatrix[] matrices = new DoubleMatrix[this.weightMatrixList.size()];
+    this.weightMatrixList.toArray(matrices);
+    return matrices;
+  }
+
+  /**
+   * Set the weight matrices.
+   * 
+   * @param matrices
+   */
+  public void setWeightMatrices(DoubleMatrix[] matrices) {
+    this.weightMatrixList = new ArrayList<DoubleMatrix>();
+    Collections.addAll(this.weightMatrixList, matrices);
+  }
+
+  /**
+   * Get the previous matrices updates in form of array.
+   * 
+   * @return The matrices in form of matrix array.
+   */
+  public DoubleMatrix[] getPrevMatricesUpdates() {
+    DoubleMatrix[] prevMatricesUpdates = new DoubleMatrix[this.prevWeightUpdatesList
+        .size()];
+    for (int i = 0; i < this.prevWeightUpdatesList.size(); ++i) {
+      prevMatricesUpdates[i] = this.prevWeightUpdatesList.get(i);
+    }
+    return prevMatricesUpdates;
+  }
+
+  public void setWeightMatrix(int index, DoubleMatrix matrix) {
+    Preconditions.checkArgument(
+        0 <= index && index < this.weightMatrixList.size(), String.format(
+            "index [%d] should be in range[%d, %d].", index, 0,
+            this.weightMatrixList.size()));
+    this.weightMatrixList.set(index, matrix);
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    super.readFields(input);
+
+    // read squash functions
+    int squashingFunctionSize = input.readInt();
+    this.squashingFunctionList = Lists.newArrayList();
+    for (int i = 0; i < squashingFunctionSize; ++i) {
+      this.squashingFunctionList.add(FunctionFactory
+          .createDoubleFunction(WritableUtils.readString(input)));
+    }
+
+    // read weights and construct matrices of previous updates
+    int numOfMatrices = input.readInt();
+    this.weightMatrixList = Lists.newArrayList();
+    this.prevWeightUpdatesList = Lists.newArrayList();
+    for (int i = 0; i < numOfMatrices; ++i) {
+      DoubleMatrix matrix = MatrixWritable.read(input);
+      this.weightMatrixList.add(matrix);
+      this.prevWeightUpdatesList.add(new DenseDoubleMatrix(
+          matrix.getRowCount(), matrix.getColumnCount()));
+    }
+
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    super.write(output);
+
+    // write squashing functions
+    output.writeInt(this.squashingFunctionList.size());
+    for (DoubleFunction aSquashingFunctionList : this.squashingFunctionList) {
+      WritableUtils.writeString(output,
+          aSquashingFunctionList.getFunctionName());
+    }
+
+    // write weight matrices
+    output.writeInt(this.weightMatrixList.size());
+    for (DoubleMatrix aWeightMatrixList : this.weightMatrixList) {
+      MatrixWritable.write(aWeightMatrixList, output);
+    }
+
+    // DO NOT WRITE WEIGHT UPDATE
+  }
+
+  @Override
+  public DoubleMatrix getWeightsByLayer(int layerIdx) {
+    return this.weightMatrixList.get(layerIdx);
+  }
+
+  /**
+   * Get the output of the model according to given feature instance.
+   */
+  @Override
+  public DoubleVector getOutput(DoubleVector instance) {
+    Preconditions.checkArgument(this.layerSizeList.get(0) - 1 == instance
+        .getDimension(), String.format(
+        "The dimension of input instance should be %d.",
+        this.layerSizeList.get(0) - 1));
+    // transform the features to another space
+    DoubleVector transformedInstance = this.featureTransformer
+        .transform(instance);
+    // add bias feature
+    DoubleVector instanceWithBias = new DenseDoubleVector(
+        transformedInstance.getDimension() + 1);
+    instanceWithBias.set(0, 0.99999); // set bias to be a little bit less than
+                                      // 1.0
+    for (int i = 1; i < instanceWithBias.getDimension(); ++i) {
+      instanceWithBias.set(i, transformedInstance.get(i - 1));
+    }
+
+    List<DoubleVector> outputCache = getOutputInternal(instanceWithBias);
+    // return the output of the last layer
+    DoubleVector result = outputCache.get(outputCache.size() - 1);
+    // remove bias
+    return result.sliceUnsafe(1, result.getDimension() - 1);
+  }
+
+  /**
+   * Calculate output internally, the intermediate output of each layer will be
+   * stored.
+   * 
+   * @param instanceWithBias The instance contains the features.
+   * @return Cached output of each layer.
+   */
+  public List<DoubleVector> getOutputInternal(DoubleVector instanceWithBias) {
+    List<DoubleVector> outputCache = new ArrayList<DoubleVector>();
+    // fill with instance
+    DoubleVector intermediateOutput = instanceWithBias;
+    outputCache.add(intermediateOutput);
+
+    for (int i = 0; i < this.layerSizeList.size() - 1; ++i) {
+      intermediateOutput = forward(i, intermediateOutput);
+      outputCache.add(intermediateOutput);
+    }
+
+    return outputCache;
+  }
+
+  /**
+   * @return a new neuron instance
+   */
+  public static Neuron<Synapse<DoubleWritable, DoubleWritable>> newNeuronInstance() {
+    return (Neuron<Synapse<DoubleWritable, DoubleWritable>>) ReflectionUtils
+        .newInstance(neuronClass);
+  }
+
+  /**
+   * Forward the calculation for one layer.
+   * 
+   * @param fromLayer The index of the previous layer.
+   * @param intermediateOutput The intermediateOutput of previous layer.
+   * @return a new vector with the result of the operation.
+   */
+  @SuppressWarnings("unchecked")
+  protected DoubleVector forward(int fromLayer, DoubleVector intermediateOutput) {
+    DoubleMatrix weightMatrix = this.weightMatrixList.get(fromLayer);
+
+    neuronClass = (Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>>) conf
+        .getClass("neuron.class", Neuron.class);
+
+    // TODO use the multithread processing
+    DoubleVector vec = new DenseDoubleVector(weightMatrix.getRowCount());
+    for (int row = 0; row < weightMatrix.getRowCount(); row++) {
+      List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
+      for (int col = 0; col < weightMatrix.getColumnCount(); col++) {
+        msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
+            new DoubleWritable(intermediateOutput.get(col)),
+            new DoubleWritable(weightMatrix.get(row, col))));
+      }
+      Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
+      Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
+      n.setup(conf);
+      n.setSquashingFunction(this.squashingFunctionList.get(fromLayer));
+      try {
+        n.forward(iterable);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      vec.set(row, n.getOutput());
+    }
+
+    // add bias
+    DoubleVector vecWithBias = new DenseDoubleVector(vec.getDimension() + 1);
+    vecWithBias.set(0, 1);
+    for (int i = 0; i < vec.getDimension(); ++i) {
+      vecWithBias.set(i + 1, vec.get(i));
+    }
+
+    return vecWithBias;
+  }
+
+  /**
+   * Train the model online.
+   * 
+   * @param trainingInstance
+   */
+  public void trainOnline(DoubleVector trainingInstance) {
+    DoubleMatrix[] updateMatrices = this.trainByInstance(trainingInstance);
+    this.updateWeightMatrices(updateMatrices);
+  }
+
+  @Override
+  public DoubleMatrix[] trainByInstance(DoubleVector trainingInstance) {
+    DoubleVector transformedVector = this.featureTransformer
+        .transform(trainingInstance.sliceUnsafe(this.layerSizeList.get(0) - 1));
+
+    int inputDimension = this.layerSizeList.get(0) - 1;
+    int outputDimension;
+    DoubleVector inputInstance = null;
+    DoubleVector labels = null;
+    if (this.learningStyle == LearningStyle.SUPERVISED) {
+      outputDimension = this.layerSizeList.get(this.layerSizeList.size() - 1);
+      // validate training instance
+      Preconditions.checkArgument(
+          inputDimension + outputDimension == trainingInstance.getDimension(),
+          String
+              .format(
+                  "The dimension of training instance is %d, but requires %d.",
+                  trainingInstance.getDimension(), inputDimension
+                      + outputDimension));
+
+      inputInstance = new DenseDoubleVector(this.layerSizeList.get(0));
+      inputInstance.set(0, 1); // add bias
+      // get the features from the transformed vector
+      for (int i = 0; i < inputDimension; ++i) {
+        inputInstance.set(i + 1, transformedVector.get(i));
+      }
+      // get the labels from the original training instance
+      labels = trainingInstance.sliceUnsafe(inputInstance.getDimension() - 1,
+          trainingInstance.getDimension() - 1);
+    } else if (this.learningStyle == LearningStyle.UNSUPERVISED) {
+      // labels are identical to input features
+      outputDimension = inputDimension;
+      // validate training instance
+      Preconditions.checkArgument(inputDimension == trainingInstance
+          .getDimension(), String.format(
+          "The dimension of training instance is %d, but requires %d.",
+          trainingInstance.getDimension(), inputDimension));
+
+      inputInstance = new DenseDoubleVector(this.layerSizeList.get(0));
+      inputInstance.set(0, 1); // add bias
+      // get the features from the transformed vector
+      for (int i = 0; i < inputDimension; ++i) {
+        inputInstance.set(i + 1, transformedVector.get(i));
+      }
+      // get the labels by copying the transformed vector
+      labels = transformedVector.deepCopy();
+    }
+
+    List<DoubleVector> internalResults = this.getOutputInternal(inputInstance);
+    DoubleVector output = internalResults.get(internalResults.size() - 1);
+
+    // get the training error
+    calculateTrainingError(labels,
+        output.deepCopy().sliceUnsafe(1, output.getDimension() - 1));
+
+    if (this.trainingMethod.equals(TrainingMethod.GRADIENT_DESCENT)) {
+      return this.trainByInstanceGradientDescent(labels, internalResults);
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Training method is not supported."));
+    }
+  }
+
+  /**
+   * Train by gradient descent. Get the updated weights using one training
+   * instance.
+   * 
+   * @param trainingInstance
+   * @return The weight update matrices.
+   */
+  private DoubleMatrix[] trainByInstanceGradientDescent(DoubleVector labels,
+      List<DoubleVector> internalResults) {
+
+    DoubleVector output = internalResults.get(internalResults.size() - 1);
+    // initialize weight update matrices
+    DenseDoubleMatrix[] weightUpdateMatrices = new DenseDoubleMatrix[this.weightMatrixList
+        .size()];
+    for (int m = 0; m < weightUpdateMatrices.length; ++m) {
+      weightUpdateMatrices[m] = new DenseDoubleMatrix(this.weightMatrixList
+          .get(m).getRowCount(), this.weightMatrixList.get(m).getColumnCount());
+    }
+    DoubleVector deltaVec = new DenseDoubleVector(
+        this.layerSizeList.get(this.layerSizeList.size() - 1));
+
+    DoubleFunction squashingFunction = this.squashingFunctionList
+        .get(this.squashingFunctionList.size() - 1);
+
+    DoubleMatrix lastWeightMatrix = this.weightMatrixList
+        .get(this.weightMatrixList.size() - 1);
+    for (int i = 0; i < deltaVec.getDimension(); ++i) {
+      double costFuncDerivative = this.costFunction.applyDerivative(
+          labels.get(i), output.get(i + 1));
+      // add regularization
+      costFuncDerivative += this.regularizationWeight
+          * lastWeightMatrix.getRowVector(i).sum();
+      deltaVec.set(
+          i,
+          costFuncDerivative
+              * squashingFunction.applyDerivative(output.get(i + 1)));
+    }
+
+    // start from previous layer of output layer
+    for (int layer = this.layerSizeList.size() - 2; layer >= 0; --layer) {
+      output = internalResults.get(layer);
+      deltaVec = backpropagate(layer, deltaVec, internalResults,
+          weightUpdateMatrices[layer]);
+    }
+
+    this.setPrevWeightMatrices(weightUpdateMatrices);
+
+    return weightUpdateMatrices;
+  }
+
+  /**
+   * Back-propagate the errors to from next layer to current layer. The weight
+   * updated information will be stored in the weightUpdateMatrices, and the
+   * delta of the prevLayer would be returned.
+   * 
+   * @param layer Index of current layer.
+   * @param internalOutput Internal output of current layer.
+   * @param deltaVec Delta of next layer.
+   * @return the squashing function of the specified position.
+   */
+  private DoubleVector backpropagate(int curLayerIdx,
+      DoubleVector nextLayerDelta, List<DoubleVector> outputCache,
+      DenseDoubleMatrix weightUpdateMatrix) {
+
+    // get layer related information
+    DoubleVector curLayerOutput = outputCache.get(curLayerIdx);
+    DoubleMatrix weightMatrix = this.weightMatrixList.get(curLayerIdx);
+    DoubleMatrix prevWeightMatrix = this.prevWeightUpdatesList.get(curLayerIdx);
+
+    // next layer is not output layer, remove the delta of bias neuron
+    if (curLayerIdx != this.layerSizeList.size() - 2) {
+      nextLayerDelta = nextLayerDelta.slice(1,
+          nextLayerDelta.getDimension() - 1);
+    }
+
+    // DoubleMatrix transposed = weightMatrix.transpose();
+    DoubleVector deltaVector = new DenseDoubleVector(
+        weightMatrix.getColumnCount());
+    for (int row = 0; row < weightMatrix.getColumnCount(); ++row) {
+      Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
+      // calls setup method
+      n.setup(conf);
+      n.setSquashingFunction(this.squashingFunctionList.get(curLayerIdx));
+      n.setOutput(curLayerOutput.get(row));
+
+      List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
+
+      n.setWeightVector(weightMatrix.getRowCount());
+
+      for (int col = 0; col < weightMatrix.getRowCount(); ++col) {
+        // sum += (transposed.get(row, col) * nextLayerDelta.get(col));
+        msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
+            new DoubleWritable(nextLayerDelta.get(col)), new DoubleWritable(
+                weightMatrix.get(col, row)), new DoubleWritable(
+                prevWeightMatrix.get(col, row))));
+      }
+
+      Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
+      try {
+        n.backward(iterable);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+
+      // update weights
+      weightUpdateMatrix.setColumn(row, n.getWeights());
+      deltaVector.set(row, n.getDelta());
+    }
+
+    return deltaVector;
+  }
+
+  @Override
+  protected BSPJob trainInternal(HamaConfiguration hamaConf)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    this.conf = hamaConf;
+    this.fs = FileSystem.get(conf);
+
+    String modelPath = conf.get("model.path");
+    if (modelPath != null) {
+      this.modelPath = modelPath;
+    }
+    // modelPath must be set before training
+    if (this.modelPath == null) {
+      throw new IllegalArgumentException(
+          "Please specify the modelPath for model, "
+              + "either through setModelPath() or add 'modelPath' to the training parameters.");
+    }
+    this.writeModelToFile();
+
+    // create job
+    BSPJob job = new BSPJob(conf, LayeredNeuralNetworkTrainer.class);
+    job.setJobName("Small scale Neural Network training");
+    job.setJarByClass(LayeredNeuralNetworkTrainer.class);
+    job.setBspClass(LayeredNeuralNetworkTrainer.class);
+
+    job.getConfiguration().setClass("neuron.class", StandardNeuron.class,
+        Neuron.class);
+
+    // additional for parameter server
+    // TODO at this moment, we use 1 task as a parameter server
+    // In the future, the number of parameter server should be configurable
+    job.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
+
+    job.setInputPath(new Path(conf.get("training.input.path")));
+    job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
+    job.setInputKeyClass(LongWritable.class);
+    job.setInputValueClass(VectorWritable.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(NullWritable.class);
+    job.setOutputFormat(org.apache.hama.bsp.NullOutputFormat.class);
+
+    return job;
+  }
+
+  @Override
+  protected void calculateTrainingError(DoubleVector labels, DoubleVector output) {
+    DoubleVector errors = labels.deepCopy().applyToElements(output,
+        this.costFunction);
+    this.trainingError = errors.sum();
+  }
+
+  /**
+   * Get the squashing function of a specified layer.
+   * 
+   * @param idx
+   * @return a new vector with the result of the operation.
+   */
+  public DoubleFunction getSquashingFunction(int idx) {
+    return this.squashingFunctionList.get(idx);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
new file mode 100644
index 0000000..effd5b0
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.hama.ipc.RPC;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The trainer that train the {@link LayeredNeuralNetwork} based on BSP
+ * framework.
+ * 
+ */
+public final class LayeredNeuralNetworkTrainer
+    extends
+    BSP<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> {
+
+  private static final Log LOG = LogFactory
+      .getLog(LayeredNeuralNetworkTrainer.class);
+
+  /* When given peer is master worker: base of parameter merge */
+  /* When given peer is slave worker: neural network for training */
+  private LayeredNeuralNetwork inMemoryModel;
+
+  /* Job configuration */
+  private HamaConfiguration conf;
+
+  /* Default batch size */
+  private int batchSize;
+
+  /* whether it is converging or not */
+  private AtomicBoolean isConverge;
+
+  /* When given peer is master worker: Asynchronous parameter merger */
+  /* When given peer is slave worker: null */
+  private RPC.Server merger;
+
+  /* When given peer is master worker: null */
+  /* When given peer is slave worker: proxy to Asynchronous parameter merger */
+  private ParameterMerger proxy;
+
+  /**
+   * Returns true if this worker is master worker.
+   *
+   * @param peer
+   * */
+  private boolean isMaster(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
+    return peer.getPeerIndex() == peer.getNumPeers() - 1;
+  }
+
+  @Override
+  /**
+   * If the model path is specified, load the existing from storage location.
+   */
+  public void setup(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
+    // At least one master & slave worker exist.
+    Preconditions.checkArgument(peer.getNumPeers() >= 2);
+    this.conf = peer.getConfiguration();
+
+    String modelPath = conf.get("model.path");
+    this.inMemoryModel = new LayeredNeuralNetwork(conf, modelPath);
+
+    this.batchSize = conf.getInt("training.batch.size", 50);
+    this.isConverge = new AtomicBoolean(false);
+
+    int slaveCount = peer.getNumPeers() - 1;
+    int mergeLimit = conf.getInt("training.max.iterations", 100000);
+    int convergenceCheckInterval = peer.getNumPeers()
+        * conf.getInt("convergence.check.interval", 2000);
+    String master = peer.getPeerName();
+    String masterAddr = master.substring(0, master.indexOf(':'));
+    int port = conf.getInt("sync.server.port", 40052);
+
+    if (isMaster(peer)) {
+      try {
+        this.merger = RPC.getServer(new ParameterMergerServer(inMemoryModel,
+            isConverge, slaveCount, mergeLimit, convergenceCheckInterval),
+            masterAddr, port, conf);
+        merger.start();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      LOG.info("Begin to train");
+    } else {
+      InetSocketAddress addr = new InetSocketAddress(masterAddr, port);
+      try {
+        this.proxy = (ParameterMerger) RPC.getProxy(ParameterMerger.class,
+            ParameterMerger.versionID, addr, conf);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  @Override
+  /**
+   * Write the trained model back to stored location.
+   */
+  public void cleanup(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
+    // write model to modelPath
+    if (isMaster(peer)) {
+      try {
+        LOG.info("Write model back to " + inMemoryModel.getModelPath());
+        this.inMemoryModel.writeModelToFile();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  @Override
+  public void bsp(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
+      throws IOException, SyncException, InterruptedException {
+    while (!this.isConverge.get()) {
+      // each slave-worker calculate the matrices updates according to local
+      // data
+      // and merge them with master
+      if (!isMaster(peer)) {
+        calculateUpdates(peer);
+      }
+    }
+
+    if (isMaster(peer)) {
+      merger.stop();
+    }
+    peer.sync(); // finalize the bsp program.
+  }
+
+  /**
+   * Calculate the matrices updates according to local partition of data.
+   * 
+   * @param peer
+   * @throws IOException
+   */
+  private void calculateUpdates(
+      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
+      throws IOException {
+
+    DoubleMatrix[] weightUpdates = new DoubleMatrix[this.inMemoryModel.weightMatrixList
+        .size()];
+    for (int i = 0; i < weightUpdates.length; ++i) {
+      int row = this.inMemoryModel.weightMatrixList.get(i).getRowCount();
+      int col = this.inMemoryModel.weightMatrixList.get(i).getColumnCount();
+      weightUpdates[i] = new DenseDoubleMatrix(row, col);
+    }
+
+    // continue to train
+    double avgTrainingError = 0.0;
+    LongWritable key = new LongWritable();
+    VectorWritable value = new VectorWritable();
+    for (int recordsRead = 0; recordsRead < batchSize; ++recordsRead) {
+      if (!peer.readNext(key, value)) {
+        peer.reopenInput();
+        peer.readNext(key, value);
+      }
+      DoubleVector trainingInstance = value.getVector();
+      LayeredNeuralNetwork.matricesAdd(weightUpdates,
+          this.inMemoryModel.trainByInstance(trainingInstance));
+      avgTrainingError += this.inMemoryModel.trainingError;
+    }
+    avgTrainingError /= batchSize;
+
+    // calculate the average of updates
+    for (int i = 0; i < weightUpdates.length; ++i) {
+      weightUpdates[i] = weightUpdates[i].divide(batchSize);
+    }
+
+    // exchange parameter update with master
+    ParameterMessage msg = new ParameterMessage(
+        avgTrainingError, false, weightUpdates,
+        this.inMemoryModel.getPrevMatricesUpdates());
+
+    ParameterMessage inMessage = proxy.merge(msg);
+    DoubleMatrix[] newWeights = inMessage.getCurMatrices();
+    DoubleMatrix[] preWeightUpdates = inMessage.getPrevMatrices();
+    this.inMemoryModel.setWeightMatrices(newWeights);
+    this.inMemoryModel.setPrevWeightMatrices(preWeightUpdates);
+    this.isConverge.set(inMessage.isConverge());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/Neuron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/Neuron.java b/src/main/java/org/apache/horn/core/Neuron.java
new file mode 100644
index 0000000..357b42f
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/Neuron.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.commons.math.DoubleFunction;
+
+public abstract class Neuron<M extends Writable> implements NeuronInterface<M> {
+  double output;
+  double weight;
+  double delta;
+  protected DoubleFunction squashingFunction;
+
+  public void feedforward(double sum) {
+    // TODO Auto-generated method stub
+    // squashing
+    this.output = sum;
+  }
+
+  public void backpropagate(double gradient) {
+    // TODO Auto-generated method stub
+    this.delta = gradient;
+  }
+
+  public double getDelta() {
+    return delta;
+  }
+
+  public void setWeight(double weight) {
+    this.weight = weight;
+  }
+
+  public void setOutput(double output) {
+    this.output = output;
+  }
+
+  public double getOutput() {
+    return output;
+  }
+
+  // ////////* Below methods will communicate with parameter server */
+  private int i;
+
+  public void push(double weight) {
+    weights[i++] = weight;
+  }
+
+  public double getUpdate() {
+    return weight;
+  }
+
+  double[] weights;
+
+  public void setWeightVector(int rowCount) {
+    i = 0;
+    weights = new double[rowCount];
+  }
+
+  public double[] getWeights() {
+    return weights;
+  }
+
+  public void setSquashingFunction(DoubleFunction squashingFunction) {
+    this.squashingFunction = squashingFunction;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/NeuronInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/NeuronInterface.java b/src/main/java/org/apache/horn/core/NeuronInterface.java
new file mode 100644
index 0000000..5e4c113
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/NeuronInterface.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+
+public interface NeuronInterface<M extends Writable> {
+
+  public void setup(HamaConfiguration conf);
+  
+  /**
+   * This method is called when the messages are propagated from the lower
+   * layer. It can be used to determine if the neuron would activate, or fire.
+   * 
+   * @param messages
+   * @throws IOException
+   */
+  public void forward(Iterable<M> messages) throws IOException;
+
+  /**
+   * This method is called when the errors are propagated from the upper layer.
+   * It can be used to calculate the error of each neuron and change the
+   * weights.
+   * 
+   * @param messages
+   * @throws IOException
+   */
+  public void backward(Iterable<M> messages) throws IOException;
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/ParameterMerger.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/ParameterMerger.java b/src/main/java/org/apache/horn/core/ParameterMerger.java
new file mode 100644
index 0000000..512b402
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/ParameterMerger.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import org.apache.hama.ipc.VersionedProtocol;
+
+public interface ParameterMerger extends VersionedProtocol {
+  long versionID = 1L;
+
+  ParameterMessage merge(ParameterMessage msg);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/ParameterMergerServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/ParameterMergerServer.java b/src/main/java/org/apache/horn/core/ParameterMergerServer.java
new file mode 100644
index 0000000..c76a4d0
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/ParameterMergerServer.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.commons.math.DoubleMatrix;
+
+import com.google.common.base.Preconditions;
+
+public class ParameterMergerServer implements ParameterMerger {
+
+  private static final Log LOG = LogFactory.getLog(ParameterMergerServer.class);
+
+  /* The parameter merge base. */
+  protected LayeredNeuralNetwork inMemoryModel;
+
+  /* To terminate or not to terminate. */
+  protected AtomicBoolean isConverge;
+
+  /* The number of slave works that request commits. */
+  protected int SlaveCount;
+
+  /* After mergeLimit, terminate whether the result is converging or not. */
+  protected int mergeLimit;
+
+  /*
+   * last n training errors. converging is decided based on the average value of
+   * these errors.
+   */
+  protected double[] trainingErrors;
+
+  /*
+   * If the average of last n training errors is smaller than this value, it is
+   * converging.
+   */
+  protected double prevAvgTrainingError = Double.MAX_VALUE;
+
+  /* current index for trainingErrors. */
+  protected int curTrainingError = 0;
+
+  /* how many merges have been conducted? */
+  protected int mergeCount = 0;
+
+  public ParameterMergerServer(LayeredNeuralNetwork inMemoryModel,
+      AtomicBoolean isConverge, int slaveCount, int mergeLimit,
+      int convergenceCheckInterval) {
+    this.inMemoryModel = inMemoryModel;
+    this.isConverge = isConverge;
+    this.SlaveCount = slaveCount;
+    this.mergeLimit = mergeLimit;
+    this.trainingErrors = new double[convergenceCheckInterval];
+  }
+
+  @Override
+  public long getProtocolVersion(String s, long l) throws IOException {
+    return ParameterMerger.versionID;
+  }
+
+  @Override
+  public ParameterMessage merge(
+      ParameterMessage msg) {
+
+    double trainingError = msg.getTrainingError();
+    DoubleMatrix[] weightUpdates = msg.getCurMatrices();
+    DoubleMatrix[] prevWeightUpdates = msg.getPrevMatrices();
+
+    Preconditions
+        .checkArgument(weightUpdates.length == prevWeightUpdates.length);
+
+    LOG.info("Start merging: " + this.mergeCount);
+
+    if (!this.isConverge.get()) {
+      for (int i = 0; i < weightUpdates.length; ++i) {
+        weightUpdates[i] = weightUpdates[i].divide(this.SlaveCount);
+        prevWeightUpdates[i] = prevWeightUpdates[i].divide(this.SlaveCount);
+      }
+
+      synchronized (inMemoryModel) {
+        this.inMemoryModel.updateWeightMatrices(weightUpdates);
+        this.inMemoryModel.setPrevWeightMatrices(prevWeightUpdates);
+
+        // add trainingError to trainingErrors
+        this.trainingErrors[this.curTrainingError++] = trainingError;
+
+        // check convergence
+        if (this.trainingErrors.length == this.curTrainingError) {
+          double curAvgTrainingError = 0.0;
+          for (int i = 0; i < this.curTrainingError; ++i) {
+            curAvgTrainingError += this.trainingErrors[i];
+          }
+          curAvgTrainingError /= this.trainingErrors.length;
+
+          if (prevAvgTrainingError < curAvgTrainingError) {
+            this.isConverge.set(true);
+          } else {
+            // update
+            prevAvgTrainingError = curAvgTrainingError;
+            this.curTrainingError = 0;
+          }
+        }
+
+        if (++this.mergeCount == this.mergeLimit) {
+          this.isConverge.set(true);
+        }
+      }
+    }
+
+    return new ParameterMessage(0, this.isConverge.get(),
+        this.inMemoryModel.getWeightMatrices(),
+        this.inMemoryModel.getPrevMatricesUpdates());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/ParameterMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/ParameterMessage.java b/src/main/java/org/apache/horn/core/ParameterMessage.java
new file mode 100644
index 0000000..3905e25
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/ParameterMessage.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.commons.io.MatrixWritable;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DoubleMatrix;
+
+/**
+ * ParameterMessage transmits the messages between workers and parameter
+ * servers during the training of neural networks.
+ * 
+ */
+public class ParameterMessage implements Writable {
+
+  protected double trainingError;
+  protected DoubleMatrix[] curMatrices;
+  protected DoubleMatrix[] prevMatrices;
+  protected boolean converge;
+
+  public ParameterMessage() {
+  }
+
+  public ParameterMessage(double trainingError, boolean converge,
+      DoubleMatrix[] weightMatrices, DoubleMatrix[] prevMatrices) {
+    this.trainingError = trainingError;
+    this.converge = converge;
+    this.curMatrices = weightMatrices;
+    this.prevMatrices = prevMatrices;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    trainingError = input.readDouble();
+    converge = input.readBoolean();
+    int numMatrices = input.readInt();
+    boolean hasPrevMatrices = input.readBoolean();
+    curMatrices = new DenseDoubleMatrix[numMatrices];
+    // read matrice updates
+    for (int i = 0; i < curMatrices.length; ++i) {
+      curMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
+    }
+
+    if (hasPrevMatrices) {
+      prevMatrices = new DenseDoubleMatrix[numMatrices];
+      // read previous matrices updates
+      for (int i = 0; i < prevMatrices.length; ++i) {
+        prevMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeDouble(trainingError);
+    output.writeBoolean(converge);
+    output.writeInt(curMatrices.length);
+    if (prevMatrices == null) {
+      output.writeBoolean(false);
+    } else {
+      output.writeBoolean(true);
+    }
+    for (DoubleMatrix matrix : curMatrices) {
+      MatrixWritable.write(matrix, output);
+    }
+    if (prevMatrices != null) {
+      for (DoubleMatrix matrix : prevMatrices) {
+        MatrixWritable.write(matrix, output);
+      }
+    }
+  }
+
+  public double getTrainingError() {
+    return trainingError;
+  }
+
+  public void setTrainingError(double trainingError) {
+    this.trainingError = trainingError;
+  }
+
+  public boolean isConverge() {
+    return converge;
+  }
+
+  public void setConverge(boolean converge) {
+    this.converge = converge;
+  }
+
+  public DoubleMatrix[] getCurMatrices() {
+    return curMatrices;
+  }
+
+  public void setMatrices(DoubleMatrix[] curMatrices) {
+    this.curMatrices = curMatrices;
+  }
+
+  public DoubleMatrix[] getPrevMatrices() {
+    return prevMatrices;
+  }
+
+  public void setPrevMatrices(DoubleMatrix[] prevMatrices) {
+    this.prevMatrices = prevMatrices;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/Synapse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/Synapse.java b/src/main/java/org/apache/horn/core/Synapse.java
new file mode 100644
index 0000000..714767b
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/Synapse.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Message wrapper for a propagating message
+ */
+public class Synapse<M extends Writable, W extends Writable> implements
+    Writable {
+
+  DoubleWritable message;
+  DoubleWritable weight;
+  DoubleWritable prevWeight;
+
+  public Synapse(DoubleWritable message, DoubleWritable weight) {
+    this.message = message;
+    this.weight = weight;
+  }
+
+  public Synapse(DoubleWritable message, DoubleWritable weight, DoubleWritable prevWeight) {
+    this.message = message;
+    this.weight = weight;
+    this.prevWeight = prevWeight;
+  }
+  
+  /**
+   * @return the activation or error message
+   */
+  public double getMessage() {
+    return message.get();
+  }
+
+  public double getInput() {
+    // returns the input
+    return message.get();
+  }
+  
+  public double getDelta() {
+    // returns the delta
+    return message.get();
+  }
+  
+  public double getWeight() {
+    return weight.get();
+  }
+  
+  public double getPrevWeight() {
+    return prevWeight.get();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    message.readFields(in);
+    weight.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    message.write(out);
+    weight.write(out);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
index f66344c..c3bf180 100644
--- a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
+++ b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hama.HamaConfiguration;
-import org.apache.horn.bsp.HornJob;
-import org.apache.horn.bsp.Neuron;
-import org.apache.horn.bsp.Synapse;
+import org.apache.horn.core.HornJob;
+import org.apache.horn.core.Neuron;
+import org.apache.horn.core.Synapse;
 import org.apache.horn.funcs.CrossEntropy;
 import org.apache.horn.funcs.Sigmoid;
 
@@ -101,7 +101,7 @@ public class MultiLayerPerceptron {
       InterruptedException, ClassNotFoundException {
     if (args.length < 9) {
       System.out
-          .println("Usage: model_path training_set learning_rate momentum regularization_weight feature_dimension label_dimension max_iteration num_tasks");
+          .println("Usage: <MODEL_PATH> <INPUT_PATH> <LEARNING_RATE> <MOMEMTUM_WEIGHT> <REGULARIZATION_WEIGHT> <FEATURE_DIMENSION> <LABEL_DIMENSION> <MAX_ITERATION> <NUM_TASKS>");
       System.exit(1);
     }
     HornJob ann = createJob(new HamaConfiguration(), args[0], args[1],


[4/4] incubator-horn git commit: Code refactoring

Posted by ed...@apache.org.
Code refactoring


Project: http://git-wip-us.apache.org/repos/asf/incubator-horn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-horn/commit/ac8eaf8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-horn/tree/ac8eaf8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-horn/diff/ac8eaf8e

Branch: refs/heads/master
Commit: ac8eaf8e179b147777e7f1b9312045a42df86373
Parents: 277773c
Author: Edward J. Yoon <ed...@apache.org>
Authored: Tue Apr 26 12:30:53 2016 +0900
Committer: Edward J. Yoon <ed...@apache.org>
Committed: Tue Apr 26 12:42:47 2016 +0900

----------------------------------------------------------------------
 .../horn/bsp/AbstractLayeredNeuralNetwork.java  | 221 -------
 .../java/org/apache/horn/bsp/AutoEncoder.java   | 197 ------
 src/main/java/org/apache/horn/bsp/HornJob.java  | 109 ----
 .../java/org/apache/horn/bsp/NeuralNetwork.java | 237 -------
 .../apache/horn/bsp/NeuralNetworkTrainer.java   | 106 ---
 src/main/java/org/apache/horn/bsp/Neuron.java   |  82 ---
 .../org/apache/horn/bsp/NeuronInterface.java    |  48 --
 .../org/apache/horn/bsp/ParameterMerger.java    |  27 -
 .../apache/horn/bsp/ParameterMergerServer.java  | 132 ----
 .../horn/bsp/SmallLayeredNeuralNetwork.java     | 620 ------------------
 .../bsp/SmallLayeredNeuralNetworkMessage.java   | 126 ----
 .../bsp/SmallLayeredNeuralNetworkTrainer.java   | 216 -------
 src/main/java/org/apache/horn/bsp/Synapse.java  |  85 ---
 .../horn/core/AbstractLayeredNeuralNetwork.java | 221 +++++++
 .../apache/horn/core/AbstractNeuralNetwork.java | 237 +++++++
 .../horn/core/AbstractNeuralNetworkTrainer.java | 108 ++++
 .../java/org/apache/horn/core/AutoEncoder.java  | 197 ++++++
 src/main/java/org/apache/horn/core/HornJob.java | 109 ++++
 .../apache/horn/core/LayeredNeuralNetwork.java  | 621 ++++++++++++++++++
 .../horn/core/LayeredNeuralNetworkTrainer.java  | 216 +++++++
 src/main/java/org/apache/horn/core/Neuron.java  |  82 +++
 .../org/apache/horn/core/NeuronInterface.java   |  48 ++
 .../org/apache/horn/core/ParameterMerger.java   |  27 +
 .../apache/horn/core/ParameterMergerServer.java | 132 ++++
 .../org/apache/horn/core/ParameterMessage.java  | 125 ++++
 src/main/java/org/apache/horn/core/Synapse.java |  85 +++
 .../horn/examples/MultiLayerPerceptron.java     |   8 +-
 .../org/apache/horn/examples/NeuralNetwork.java | 217 -------
 .../java/org/apache/horn/bsp/MLTestBase.java    |  64 --
 .../org/apache/horn/bsp/TestAutoEncoder.java    | 202 ------
 .../horn/bsp/TestSmallLayeredNeuralNetwork.java | 642 -------------------
 .../TestSmallLayeredNeuralNetworkMessage.java   | 172 -----
 .../java/org/apache/horn/core/MLTestBase.java   |  64 ++
 .../org/apache/horn/core/TestAutoEncoder.java   | 203 ++++++
 .../java/org/apache/horn/core/TestNeuron.java   |  93 +++
 .../core/TestSmallLayeredNeuralNetwork.java     | 642 +++++++++++++++++++
 .../TestSmallLayeredNeuralNetworkMessage.java   | 173 +++++
 .../horn/examples/MultiLayerPerceptronTest.java | 177 +++++
 .../apache/horn/examples/NeuralNetworkTest.java | 212 ------
 .../org/apache/horn/trainer/TestNeuron.java     |  93 ---
 40 files changed, 3564 insertions(+), 3812 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
deleted file mode 100644
index b18eb44..0000000
--- a/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.commons.math.DoubleDoubleFunction;
-import org.apache.hama.commons.math.DoubleFunction;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.horn.funcs.FunctionFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * AbstractLayeredNeuralNetwork defines the general operations for derivative
- * layered models, include Linear Regression, Logistic Regression, Multilayer
- * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc.
- * 
- * In general, these models consist of neurons which are aligned in layers.
- * Between layers, for any two adjacent layers, the neurons are connected to
- * form a bipartite weighted graph.
- * 
- */
-abstract class AbstractLayeredNeuralNetwork extends NeuralNetwork {
-
-  private static final double DEFAULT_MOMENTUM_WEIGHT = 0.1;
-
-  double trainingError;
-
-  /* The momentumWeight */
-  protected double momentumWeight;
-
-  /* The cost function of the model */
-  protected DoubleDoubleFunction costFunction;
-
-  /* Record the size of each layer */
-  protected List<Integer> layerSizeList;
-
-  protected TrainingMethod trainingMethod;
-  
-  protected LearningStyle learningStyle;
-
-  public static enum TrainingMethod {
-    GRADIENT_DESCENT
-  }
-  
-  public static enum LearningStyle {
-    UNSUPERVISED,
-    SUPERVISED
-  }
-  
-  public AbstractLayeredNeuralNetwork() {
-    this.momentumWeight = DEFAULT_MOMENTUM_WEIGHT;
-    this.trainingMethod = TrainingMethod.GRADIENT_DESCENT;
-    this.learningStyle = LearningStyle.SUPERVISED;
-  }
-
-  public AbstractLayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
-    super(conf, modelPath);
-  }
-
-  public void setTrainingMethod(TrainingMethod method) {
-    this.trainingMethod = method;
-  }
-
-  public TrainingMethod getTrainingMethod() {
-    return this.trainingMethod;
-  }
-  
-  public void setLearningStyle(LearningStyle style) {
-    this.learningStyle = style;
-  }
-  
-  public LearningStyle getLearningStyle() {
-    return this.learningStyle;
-  }
-
-  /**
-   * Set the cost function for the model.
-   * 
-   * @param costFunction
-   */
-  public void setCostFunction(DoubleDoubleFunction costFunction) {
-    this.costFunction = costFunction;
-  }
-
-  /**
-   * Add a layer of neurons with specified size. If the added layer is not the
-   * first layer, it will automatically connects the neurons between with the
-   * previous layer.
-   * 
-   * @param size
-   * @param isFinalLayer If false, add a bias neuron.
-   * @param squashingFunction The squashing function for this layer, input layer
-   *          is f(x) = x by default.
-   * @return The layer index, starts with 0.
-   */
-  public abstract int addLayer(int size, boolean isFinalLayer,
-      DoubleFunction squashingFunction);
-
-  /**
-   * Get the size of a particular layer.
-   * 
-   * @param layer
-   * @return The layer size.
-   */
-  public int getLayerSize(int layer) {
-    Preconditions.checkArgument(
-        layer >= 0 && layer < this.layerSizeList.size(),
-        String.format("Input must be in range [0, %d]\n",
-            this.layerSizeList.size() - 1));
-    return this.layerSizeList.get(layer);
-  }
-
-  /**
-   * Get the layer size list.
-   * 
-   * @return The layer size list.
-   */
-  protected List<Integer> getLayerSizeList() {
-    return this.layerSizeList;
-  }
-
-  /**
-   * Get the weights between layer layerIdx and layerIdx + 1
-   * 
-   * @param layerIdx The index of the layer
-   * @return The weights in form of {@link DoubleMatrix}
-   */
-  public abstract DoubleMatrix getWeightsByLayer(int layerIdx);
-
-  /**
-   * Get the updated weights using one training instance.
-   * 
-   * @param trainingInstance The trainingInstance is the concatenation of
-   *          feature vector and class label vector.
-   * @return The update of each weight, in form of matrix list.
-   * @throws Exception
-   */
-  public abstract DoubleMatrix[] trainByInstance(DoubleVector trainingInstance);
-
-  /**
-   * Get the output calculated by the model.
-   * 
-   * @param instance The feature instance.
-   * @return a new vector with the result of the operation.
-   */
-  public abstract DoubleVector getOutput(DoubleVector instance);
-
-  /**
-   * Calculate the training error based on the labels and outputs.
-   * 
-   * @param labels
-   * @param output
-   */
-  protected abstract void calculateTrainingError(DoubleVector labels,
-      DoubleVector output);
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    super.readFields(input);
-    // read momentum weight
-    this.momentumWeight = input.readDouble();
-
-    // read cost function
-    this.costFunction = FunctionFactory
-        .createDoubleDoubleFunction(WritableUtils.readString(input));
-
-    // read layer size list
-    int numLayers = input.readInt();
-    this.layerSizeList = Lists.newArrayList();
-    for (int i = 0; i < numLayers; ++i) {
-      this.layerSizeList.add(input.readInt());
-    }
-
-    this.trainingMethod = WritableUtils.readEnum(input, TrainingMethod.class);
-    this.learningStyle = WritableUtils.readEnum(input, LearningStyle.class);
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    super.write(output);
-    // write momentum weight
-    output.writeDouble(this.momentumWeight);
-
-    // write cost function
-    WritableUtils.writeString(output, costFunction.getFunctionName());
-
-    // write layer size list
-    output.writeInt(this.layerSizeList.size());
-    for (Integer aLayerSizeList : this.layerSizeList) {
-      output.writeInt(aLayerSizeList);
-    }
-
-    WritableUtils.writeEnum(output, this.trainingMethod);
-    WritableUtils.writeEnum(output, this.learningStyle);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/AutoEncoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/AutoEncoder.java b/src/main/java/org/apache/horn/bsp/AutoEncoder.java
deleted file mode 100644
index 8ea2930..0000000
--- a/src/main/java/org/apache/horn/bsp/AutoEncoder.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleFunction;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.ml.util.FeatureTransformer;
-import org.apache.horn.funcs.FunctionFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- * AutoEncoder is a model used for dimensional reduction and feature learning.
- * It is a special kind of {@link NeuralNetwork} that consists of three layers
- * of neurons, where the first layer and third layer contains the same number of
- * neurons.
- * 
- */
-public class AutoEncoder {
-
-  private final SmallLayeredNeuralNetwork model;
-
-  /**
-   * Initialize the autoencoder.
-   * 
-   * @param inputDimensions The number of dimensions for the input feature.
-   * @param compressedDimensions The number of dimensions for the compressed
-   *          information.
-   */
-  public AutoEncoder(int inputDimensions, int compressedDimensions) {
-    model = new SmallLayeredNeuralNetwork();
-    model.addLayer(inputDimensions, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    model.addLayer(compressedDimensions, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    model.addLayer(inputDimensions, true,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    model
-        .setLearningStyle(AbstractLayeredNeuralNetwork.LearningStyle.UNSUPERVISED);
-    model.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction("SquaredError"));
-  }
-
-  public AutoEncoder(HamaConfiguration conf, String modelPath) {
-    model = new SmallLayeredNeuralNetwork(conf, modelPath);
-  }
-
-  public AutoEncoder setModelPath(String modelPath) {
-    model.setModelPath(modelPath);
-    return this;
-  }
-
-  /**
-   * Train the autoencoder with given data. Note that the training data is
-   * pre-processed, where the features
-   * 
-   * @param dataInputPath
-   * @param trainingParams
-   * @throws InterruptedException 
-   * @throws IOException 
-   * @throws ClassNotFoundException 
-   */
-  public BSPJob train(HamaConfiguration conf, Path dataInputPath,
-      Map<String, String> trainingParams) throws ClassNotFoundException, IOException, InterruptedException {
-    return model.train(conf);
-  }
-
-  /**
-   * Train the model with one instance.
-   * 
-   * @param trainingInstance
-   */
-  public void trainOnline(DoubleVector trainingInstance) {
-    model.trainOnline(trainingInstance);
-  }
-
-  /**
-   * Get the matrix M used to encode the input features.
-   * 
-   * @return this matrix with encode the input.
-   */
-  public DoubleMatrix getEncodeWeightMatrix() {
-    return model.getWeightsByLayer(0);
-  }
-
-  /**
-   * Get the matrix M used to decode the compressed information.
-   * 
-   * @return this matrix with decode the compressed information.
-   */
-  public DoubleMatrix getDecodeWeightMatrix() {
-    return model.getWeightsByLayer(1);
-  }
-
-  /**
-   * Transform the input features.
-   * 
-   * @param inputInstance
-   * @return The compressed information.
-   */
-  private DoubleVector transform(DoubleVector inputInstance, int inputLayer) {
-    DoubleVector internalInstance = new DenseDoubleVector(
-        inputInstance.getDimension() + 1);
-    internalInstance.set(0, 1);
-    for (int i = 0; i < inputInstance.getDimension(); ++i) {
-      internalInstance.set(i + 1, inputInstance.get(i));
-    }
-    DoubleFunction squashingFunction = model.getSquashingFunction(inputLayer);
-    DoubleMatrix weightMatrix = null;
-    if (inputLayer == 0) {
-      weightMatrix = this.getEncodeWeightMatrix();
-    } else {
-      weightMatrix = this.getDecodeWeightMatrix();
-    }
-    DoubleVector vec = weightMatrix.multiplyVectorUnsafe(internalInstance);
-    vec = vec.applyToElements(squashingFunction);
-    return vec;
-  }
-
-  /**
-   * Encode the input instance.
-   * 
-   * @param inputInstance
-   * @return a new vector with the encode input instance.
-   */
-  public DoubleVector encode(DoubleVector inputInstance) {
-    Preconditions
-        .checkArgument(
-            inputInstance.getDimension() == model.getLayerSize(0) - 1,
-            String
-                .format(
-                    "The dimension of input instance is %d, but the model requires dimension %d.",
-                    inputInstance.getDimension(), model.getLayerSize(1) - 1));
-    return this.transform(inputInstance, 0);
-  }
-
-  /**
-   * Decode the input instance.
-   * 
-   * @param inputInstance
-   * @return a new vector with the decode input instance.
-   */
-  public DoubleVector decode(DoubleVector inputInstance) {
-    Preconditions
-        .checkArgument(
-            inputInstance.getDimension() == model.getLayerSize(1) - 1,
-            String
-                .format(
-                    "The dimension of input instance is %d, but the model requires dimension %d.",
-                    inputInstance.getDimension(), model.getLayerSize(1) - 1));
-    return this.transform(inputInstance, 1);
-  }
-
-  /**
-   * Get the label(s) according to the given features.
-   * 
-   * @param inputInstance
-   * @return a new vector with output of the model according to given feature
-   *         instance.
-   */
-  public DoubleVector getOutput(DoubleVector inputInstance) {
-    return model.getOutput(inputInstance);
-  }
-
-  /**
-   * Set the feature transformer.
-   * 
-   * @param featureTransformer
-   */
-  public void setFeatureTransformer(FeatureTransformer featureTransformer) {
-    this.model.setFeatureTransformer(featureTransformer);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/HornJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/HornJob.java b/src/main/java/org/apache/horn/bsp/HornJob.java
deleted file mode 100644
index 4521b87..0000000
--- a/src/main/java/org/apache/horn/bsp/HornJob.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.commons.math.Function;
-import org.apache.horn.funcs.FunctionFactory;
-
-public class HornJob extends BSPJob {
-
-  SmallLayeredNeuralNetwork neuralNetwork;
-
-  public HornJob(HamaConfiguration conf, Class<?> exampleClass)
-      throws IOException {
-    super(conf);
-    this.setJarByClass(exampleClass);
-
-    neuralNetwork = new SmallLayeredNeuralNetwork();
-  }
-
-  public void inputLayer(int featureDimension, Class<? extends Function> func) {
-    addLayer(featureDimension, func);
-  }
-  
-  public void addLayer(int featureDimension, Class<? extends Function> func) {
-    neuralNetwork.addLayer(featureDimension, false,
-        FunctionFactory.createDoubleFunction(func.getSimpleName()));
-  }
-
-  public void outputLayer(int labels, Class<? extends Function> func) {
-    neuralNetwork.addLayer(labels, true,
-        FunctionFactory.createDoubleFunction(func.getSimpleName()));
-  }
-
-  public void setCostFunction(Class<? extends Function> func) {
-    neuralNetwork.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction(func.getSimpleName()));
-  }
-
-  public void setDouble(String name, double value) {
-    conf.setDouble(name, value);
-  }
-
-  public void setMaxIteration(int maxIteration) {
-    this.conf.setInt("training.max.iterations", maxIteration);
-  }
-
-  public void setBatchSize(int batchSize) {
-    this.conf.setInt("training.batch.size", batchSize);
-  }
-
-  public void setLearningRate(double learningRate) {
-    this.conf.setDouble("mlp.learning.rate", learningRate);
-  }
-
-  public void setConvergenceCheckInterval(int n) {
-    this.conf.setInt("convergence.check.interval", n);
-  }
-
-  public void setMomentumWeight(double momentumWeight) {
-    this.conf.setDouble("mlp.momentum.weight", momentumWeight);
-  }
-
-  public SmallLayeredNeuralNetwork getNeuralNetwork() {
-    return neuralNetwork;
-  }
-
-  public boolean waitForCompletion(boolean verbose) throws IOException,
-      InterruptedException, ClassNotFoundException {
-    BSPJob job = neuralNetwork.train(this.conf);
-    if (verbose) {
-      return job.waitForCompletion(true);
-    } else {
-      return job.waitForCompletion(false);
-    }
-  }
-
-  public void setRegularizationWeight(double regularizationWeight) {
-    this.conf.setDouble("regularization.weight", regularizationWeight);
-  }
-
-  public void setModelPath(String modelPath) {
-    this.conf.set("model.path", modelPath);
-    neuralNetwork.setModelPath(modelPath);
-  }
-
-  public void setTrainingSetPath(String inputPath) {
-    this.conf.set("training.input.path", inputPath);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/NeuralNetwork.java b/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
deleted file mode 100644
index 051881d..0000000
--- a/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.ml.util.DefaultFeatureTransformer;
-import org.apache.hama.ml.util.FeatureTransformer;
-
-import com.google.common.base.Preconditions;
-import com.google.common.io.Closeables;
-
-/**
- * NeuralNetwork defines the general operations for all the derivative models.
- * Typically, all derivative models such as Linear Regression, Logistic
- * Regression, and Multilayer Perceptron consist of neurons and the weights
- * between neurons.
- * 
- */
-abstract class NeuralNetwork implements Writable {
-  protected HamaConfiguration conf;
-  protected FileSystem fs;
-
-  private static final double DEFAULT_LEARNING_RATE = 0.5;
-
-  protected double learningRate;
-  protected boolean learningRateDecay = false;
-
-  // the name of the model
-  protected String modelType;
-  // the path to store the model
-  protected String modelPath;
-
-  protected FeatureTransformer featureTransformer;
-
-  public NeuralNetwork() {
-    this.learningRate = DEFAULT_LEARNING_RATE;
-    this.modelType = this.getClass().getSimpleName();
-    this.featureTransformer = new DefaultFeatureTransformer();
-  }
-
-  public NeuralNetwork(String modelPath) {
-    this.modelPath = modelPath;
-  }
-
-  public NeuralNetwork(HamaConfiguration conf, String modelPath) {
-    try {
-      this.conf = conf;
-      this.fs = FileSystem.get(conf);
-      this.modelPath = modelPath;
-
-      this.readFromModel();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-  }
-
-  public void isLearningRateDecay(boolean decay) {
-    this.learningRateDecay = decay;
-  }
-
-  public String getModelType() {
-    return this.modelType;
-  }
-
-  /**
-   * Train the model with the path of given training data and parameters.
-   * 
-   * @param dataInputPath The path of the training data.
-   * @param trainingParams The parameters for training.
-   * @throws InterruptedException 
-   * @throws ClassNotFoundException 
-   * @throws IOException
-   */
-  public BSPJob train(Configuration conf) throws ClassNotFoundException, IOException, InterruptedException {
-    Preconditions.checkArgument(this.modelPath != null,
-        "Please set the model path before training.");
-
-    // train with BSP job
-    return trainInternal((HamaConfiguration) conf);
-  }
-
-  /**
-   * Train the model with the path of given training data and parameters.
-   */
-  protected abstract BSPJob trainInternal(HamaConfiguration hamaConf)
-      throws IOException, InterruptedException, ClassNotFoundException;
-
-  /**
-   * Read the model meta-data from the specified location.
-   * 
-   * @throws IOException
-   */
-  protected void readFromModel() throws IOException {
-    Preconditions.checkArgument(this.modelPath != null,
-        "Model path has not been set.");
-    FSDataInputStream is = new FSDataInputStream(fs.open(new Path(modelPath)));
-    this.readFields(is);
-    Closeables.close(is, false);
-  }
-
-  /**
-   * Write the model data to specified location.
-   * 
-   * @throws IOException
-   */
-  public void writeModelToFile() throws IOException {
-    Preconditions.checkArgument(this.modelPath != null,
-        "Model path has not been set.");
-
-    FSDataOutputStream is = fs.create(new Path(this.modelPath), true);
-    this.write(is);
-
-    Closeables.close(is, false);
-  }
-
-  /**
-   * Set the model path.
-   * 
-   * @param modelPath
-   */
-  public void setModelPath(String modelPath) {
-    this.modelPath = modelPath;
-  }
-
-  /**
-   * Get the model path.
-   * 
-   * @return the path to store the model.
-   */
-  public String getModelPath() {
-    return this.modelPath;
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    // read model type
-    this.modelType = WritableUtils.readString(input);
-    // read learning rate
-    this.learningRate = input.readDouble();
-    // read model path
-    this.modelPath = WritableUtils.readString(input);
-
-    if (this.modelPath.equals("null")) {
-      this.modelPath = null;
-    }
-
-    // read feature transformer
-    int bytesLen = input.readInt();
-    byte[] featureTransformerBytes = new byte[bytesLen];
-    for (int i = 0; i < featureTransformerBytes.length; ++i) {
-      featureTransformerBytes[i] = input.readByte();
-    }
-
-    Class<? extends FeatureTransformer> featureTransformerCls = (Class<? extends FeatureTransformer>) SerializationUtils
-        .deserialize(featureTransformerBytes);
-
-    Constructor[] constructors = featureTransformerCls
-        .getDeclaredConstructors();
-    Constructor constructor = constructors[0];
-
-    try {
-      this.featureTransformer = (FeatureTransformer) constructor
-          .newInstance(new Object[] {});
-    } catch (InstantiationException e) {
-      e.printStackTrace();
-    } catch (IllegalAccessException e) {
-      e.printStackTrace();
-    } catch (IllegalArgumentException e) {
-      e.printStackTrace();
-    } catch (InvocationTargetException e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    // write model type
-    WritableUtils.writeString(output, modelType);
-    // write learning rate
-    output.writeDouble(learningRate);
-    // write model path
-    if (this.modelPath != null) {
-      WritableUtils.writeString(output, modelPath);
-    } else {
-      WritableUtils.writeString(output, "null");
-    }
-
-    // serialize the class
-    Class<? extends FeatureTransformer> featureTransformerCls = this.featureTransformer
-        .getClass();
-    byte[] featureTransformerBytes = SerializationUtils
-        .serialize(featureTransformerCls);
-    output.writeInt(featureTransformerBytes.length);
-    output.write(featureTransformerBytes);
-  }
-
-  public void setFeatureTransformer(FeatureTransformer featureTransformer) {
-    this.featureTransformer = featureTransformer;
-  }
-
-  public FeatureTransformer getFeatureTransformer() {
-    return this.featureTransformer;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java b/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java
deleted file mode 100644
index 648e86b..0000000
--- a/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.ml.util.DefaultFeatureTransformer;
-import org.apache.hama.ml.util.FeatureTransformer;
-
-/**
- * The trainer that is used to train the {@link SmallLayeredNeuralNetwork} with
- * BSP. The trainer would read the training data and obtain the trained
- * parameters of the model.
- * 
- */
-public abstract class NeuralNetworkTrainer extends
-    BSP<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> {
-
-  protected static final Log LOG = LogFactory
-      .getLog(NeuralNetworkTrainer.class);
-
-  protected Configuration conf;
-  protected int maxIteration;
-  protected int batchSize;
-  protected String trainingMode;
-  
-  protected FeatureTransformer featureTransformer;
-  
-  @Override
-  final public void setup(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
-      throws IOException, SyncException, InterruptedException {
-    conf = peer.getConfiguration();
-    featureTransformer = new DefaultFeatureTransformer();
-    this.extraSetup(peer);
-  }
-
-  /**
-   * Handle extra setup for sub-classes.
-   * 
-   * @param peer
-   * @throws IOException
-   * @throws SyncException
-   * @throws InterruptedException
-   */
-  protected void extraSetup(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
-      throws IOException, SyncException, InterruptedException {
-
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public abstract void bsp(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
-      throws IOException, SyncException, InterruptedException;
-
-  @Override
-  public void cleanup(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
-      throws IOException {
-    this.extraCleanup(peer);
-    // write model to modelPath
-  }
-
-  /**
-   * Handle cleanup for sub-classes. Write the trained model back.
-   * 
-   * @param peer
-   * @throws IOException
-   * @throws SyncException
-   * @throws InterruptedException
-   */
-  protected void extraCleanup(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
-      throws IOException {
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/Neuron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/Neuron.java b/src/main/java/org/apache/horn/bsp/Neuron.java
deleted file mode 100644
index f122b6d..0000000
--- a/src/main/java/org/apache/horn/bsp/Neuron.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.commons.math.DoubleFunction;
-
-public abstract class Neuron<M extends Writable> implements NeuronInterface<M> {
-  double output;
-  double weight;
-  double delta;
-  protected DoubleFunction squashingFunction;
-
-  public void feedforward(double sum) {
-    // TODO Auto-generated method stub
-    // squashing
-    this.output = sum;
-  }
-
-  public void backpropagate(double gradient) {
-    // TODO Auto-generated method stub
-    this.delta = gradient;
-  }
-
-  public double getDelta() {
-    return delta;
-  }
-
-  public void setWeight(double weight) {
-    this.weight = weight;
-  }
-
-  public void setOutput(double output) {
-    this.output = output;
-  }
-
-  public double getOutput() {
-    return output;
-  }
-
-  // ////////* Below methods will communicate with parameter server */
-  private int i;
-
-  public void push(double weight) {
-    weights[i++] = weight;
-  }
-
-  public double getUpdate() {
-    return weight;
-  }
-
-  double[] weights;
-
-  public void setWeightVector(int rowCount) {
-    i = 0;
-    weights = new double[rowCount];
-  }
-
-  public double[] getWeights() {
-    return weights;
-  }
-
-  public void setSquashingFunction(DoubleFunction squashingFunction) {
-    this.squashingFunction = squashingFunction;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/NeuronInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/NeuronInterface.java b/src/main/java/org/apache/horn/bsp/NeuronInterface.java
deleted file mode 100644
index bcc1a5a..0000000
--- a/src/main/java/org/apache/horn/bsp/NeuronInterface.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.HamaConfiguration;
-
-public interface NeuronInterface<M extends Writable> {
-
-  public void setup(HamaConfiguration conf);
-  
-  /**
-   * This method is called when the messages are propagated from the lower
-   * layer. It can be used to determine if the neuron would activate, or fire.
-   * 
-   * @param messages
-   * @throws IOException
-   */
-  public void forward(Iterable<M> messages) throws IOException;
-
-  /**
-   * This method is called when the errors are propagated from the upper layer.
-   * It can be used to calculate the error of each neuron and change the
-   * weights.
-   * 
-   * @param messages
-   * @throws IOException
-   */
-  public void backward(Iterable<M> messages) throws IOException;
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/ParameterMerger.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/ParameterMerger.java b/src/main/java/org/apache/horn/bsp/ParameterMerger.java
deleted file mode 100644
index 6df719a..0000000
--- a/src/main/java/org/apache/horn/bsp/ParameterMerger.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import org.apache.hama.ipc.VersionedProtocol;
-
-public interface ParameterMerger extends VersionedProtocol {
-  long versionID = 1L;
-
-  SmallLayeredNeuralNetworkMessage merge(SmallLayeredNeuralNetworkMessage msg);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java b/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java
deleted file mode 100644
index 47aab84..0000000
--- a/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hama.commons.math.DoubleMatrix;
-
-import com.google.common.base.Preconditions;
-
-public class ParameterMergerServer implements ParameterMerger {
-
-  private static final Log LOG = LogFactory.getLog(ParameterMergerServer.class);
-
-  /* The parameter merge base. */
-  protected SmallLayeredNeuralNetwork inMemoryModel;
-
-  /* To terminate or not to terminate. */
-  protected AtomicBoolean isConverge;
-
-  /* The number of slave works that request commits. */
-  protected int SlaveCount;
-
-  /* After mergeLimit, terminate whether the result is converging or not. */
-  protected int mergeLimit;
-
-  /*
-   * last n training errors. converging is decided based on the average value of
-   * these errors.
-   */
-  protected double[] trainingErrors;
-
-  /*
-   * If the average of last n training errors is smaller than this value, it is
-   * converging.
-   */
-  protected double prevAvgTrainingError = Double.MAX_VALUE;
-
-  /* current index for trainingErrors. */
-  protected int curTrainingError = 0;
-
-  /* how many merges have been conducted? */
-  protected int mergeCount = 0;
-
-  public ParameterMergerServer(SmallLayeredNeuralNetwork inMemoryModel,
-      AtomicBoolean isConverge, int slaveCount, int mergeLimit,
-      int convergenceCheckInterval) {
-    this.inMemoryModel = inMemoryModel;
-    this.isConverge = isConverge;
-    this.SlaveCount = slaveCount;
-    this.mergeLimit = mergeLimit;
-    this.trainingErrors = new double[convergenceCheckInterval];
-  }
-
-  @Override
-  public long getProtocolVersion(String s, long l) throws IOException {
-    return ParameterMerger.versionID;
-  }
-
-  @Override
-  public SmallLayeredNeuralNetworkMessage merge(
-      SmallLayeredNeuralNetworkMessage msg) {
-
-    double trainingError = msg.getTrainingError();
-    DoubleMatrix[] weightUpdates = msg.getCurMatrices();
-    DoubleMatrix[] prevWeightUpdates = msg.getPrevMatrices();
-
-    Preconditions
-        .checkArgument(weightUpdates.length == prevWeightUpdates.length);
-
-    LOG.info("Start merging: " + this.mergeCount);
-
-    if (!this.isConverge.get()) {
-      for (int i = 0; i < weightUpdates.length; ++i) {
-        weightUpdates[i] = weightUpdates[i].divide(this.SlaveCount);
-        prevWeightUpdates[i] = prevWeightUpdates[i].divide(this.SlaveCount);
-      }
-
-      synchronized (inMemoryModel) {
-        this.inMemoryModel.updateWeightMatrices(weightUpdates);
-        this.inMemoryModel.setPrevWeightMatrices(prevWeightUpdates);
-
-        // add trainingError to trainingErrors
-        this.trainingErrors[this.curTrainingError++] = trainingError;
-
-        // check convergence
-        if (this.trainingErrors.length == this.curTrainingError) {
-          double curAvgTrainingError = 0.0;
-          for (int i = 0; i < this.curTrainingError; ++i) {
-            curAvgTrainingError += this.trainingErrors[i];
-          }
-          curAvgTrainingError /= this.trainingErrors.length;
-
-          if (prevAvgTrainingError < curAvgTrainingError) {
-            this.isConverge.set(true);
-          } else {
-            // update
-            prevAvgTrainingError = curAvgTrainingError;
-            this.curTrainingError = 0;
-          }
-        }
-
-        if (++this.mergeCount == this.mergeLimit) {
-          this.isConverge.set(true);
-        }
-      }
-    }
-
-    return new SmallLayeredNeuralNetworkMessage(0, this.isConverge.get(),
-        this.inMemoryModel.getWeightMatrices(),
-        this.inMemoryModel.getPrevMatricesUpdates());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
deleted file mode 100644
index 0ea8e51..0000000
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
+++ /dev/null
@@ -1,620 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.math.RandomUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.commons.io.MatrixWritable;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleFunction;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.util.ReflectionUtils;
-import org.apache.horn.examples.MultiLayerPerceptron.StandardNeuron;
-import org.apache.horn.funcs.FunctionFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * SmallLayeredNeuralNetwork defines the general operations for derivative
- * layered models, include Linear Regression, Logistic Regression, Multilayer
- * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc. For
- * SmallLayeredNeuralNetwork, the training can be conducted in parallel, but the
- * parameters of the models are assumes to be stored in a single machine.
- * 
- * In general, these models consist of neurons which are aligned in layers.
- * Between layers, for any two adjacent layers, the neurons are connected to
- * form a bipartite weighted graph.
- * 
- */
-public class SmallLayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
-
-  private static final Log LOG = LogFactory
-      .getLog(SmallLayeredNeuralNetwork.class);
-
-  public static Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>> neuronClass;
-
-  /* Weights between neurons at adjacent layers */
-  protected List<DoubleMatrix> weightMatrixList;
-
-  /* Previous weight updates between neurons at adjacent layers */
-  protected List<DoubleMatrix> prevWeightUpdatesList;
-
-  /* Different layers can have different squashing function */
-  protected List<DoubleFunction> squashingFunctionList;
-
-  protected int finalLayerIdx;
-
-  protected double regularizationWeight;
-
-  public SmallLayeredNeuralNetwork() {
-    this.layerSizeList = Lists.newArrayList();
-    this.weightMatrixList = Lists.newArrayList();
-    this.prevWeightUpdatesList = Lists.newArrayList();
-    this.squashingFunctionList = Lists.newArrayList();
-  }
-
-  public SmallLayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
-    super(conf, modelPath);
-    this.regularizationWeight = conf.getDouble("regularization.weight", 0);
-  }
-
-  @Override
-  /**
-   * {@inheritDoc}
-   */
-  public int addLayer(int size, boolean isFinalLayer,
-      DoubleFunction squashingFunction) {
-    Preconditions.checkArgument(size > 0,
-        "Size of layer must be larger than 0.");
-    if (!isFinalLayer) {
-      size += 1;
-    }
-
-    LOG.info("Add Layer: " + size);
-    this.layerSizeList.add(size);
-    int layerIdx = this.layerSizeList.size() - 1;
-    if (isFinalLayer) {
-      this.finalLayerIdx = layerIdx;
-    }
-
-    // add weights between current layer and previous layer, and input layer has
-    // no squashing function
-    if (layerIdx > 0) {
-      int sizePrevLayer = this.layerSizeList.get(layerIdx - 1);
-      // row count equals to size of current size and column count equals to
-      // size of previous layer
-      int row = isFinalLayer ? size : size - 1;
-      int col = sizePrevLayer;
-      DoubleMatrix weightMatrix = new DenseDoubleMatrix(row, col);
-      // initialize weights
-      weightMatrix.applyToElements(new DoubleFunction() {
-        @Override
-        public double apply(double value) {
-          return RandomUtils.nextDouble() - 0.5;
-        }
-
-        @Override
-        public double applyDerivative(double value) {
-          throw new UnsupportedOperationException("");
-        }
-      });
-      this.weightMatrixList.add(weightMatrix);
-      this.prevWeightUpdatesList.add(new DenseDoubleMatrix(row, col));
-      this.squashingFunctionList.add(squashingFunction);
-    }
-    return layerIdx;
-  }
-
-  /**
-   * Update the weight matrices with given matrices.
-   * 
-   * @param matrices
-   */
-  public void updateWeightMatrices(DoubleMatrix[] matrices) {
-    for (int i = 0; i < matrices.length; ++i) {
-      DoubleMatrix matrix = this.weightMatrixList.get(i);
-      this.weightMatrixList.set(i, matrix.add(matrices[i]));
-    }
-  }
-
-  /**
-   * Set the previous weight matrices.
-   * 
-   * @param prevUpdates
-   */
-  void setPrevWeightMatrices(DoubleMatrix[] prevUpdates) {
-    this.prevWeightUpdatesList.clear();
-    Collections.addAll(this.prevWeightUpdatesList, prevUpdates);
-  }
-
-  /**
-   * Add a batch of matrices onto the given destination matrices.
-   * 
-   * @param destMatrices
-   * @param sourceMatrices
-   */
-  static void matricesAdd(DoubleMatrix[] destMatrices,
-      DoubleMatrix[] sourceMatrices) {
-    for (int i = 0; i < destMatrices.length; ++i) {
-      destMatrices[i] = destMatrices[i].add(sourceMatrices[i]);
-    }
-  }
-
-  /**
-   * Get all the weight matrices.
-   * 
-   * @return The matrices in form of matrix array.
-   */
-  DoubleMatrix[] getWeightMatrices() {
-    DoubleMatrix[] matrices = new DoubleMatrix[this.weightMatrixList.size()];
-    this.weightMatrixList.toArray(matrices);
-    return matrices;
-  }
-
-  /**
-   * Set the weight matrices.
-   * 
-   * @param matrices
-   */
-  public void setWeightMatrices(DoubleMatrix[] matrices) {
-    this.weightMatrixList = new ArrayList<DoubleMatrix>();
-    Collections.addAll(this.weightMatrixList, matrices);
-  }
-
-  /**
-   * Get the previous matrices updates in form of array.
-   * 
-   * @return The matrices in form of matrix array.
-   */
-  public DoubleMatrix[] getPrevMatricesUpdates() {
-    DoubleMatrix[] prevMatricesUpdates = new DoubleMatrix[this.prevWeightUpdatesList
-        .size()];
-    for (int i = 0; i < this.prevWeightUpdatesList.size(); ++i) {
-      prevMatricesUpdates[i] = this.prevWeightUpdatesList.get(i);
-    }
-    return prevMatricesUpdates;
-  }
-
-  public void setWeightMatrix(int index, DoubleMatrix matrix) {
-    Preconditions.checkArgument(
-        0 <= index && index < this.weightMatrixList.size(), String.format(
-            "index [%d] should be in range[%d, %d].", index, 0,
-            this.weightMatrixList.size()));
-    this.weightMatrixList.set(index, matrix);
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    super.readFields(input);
-
-    // read squash functions
-    int squashingFunctionSize = input.readInt();
-    this.squashingFunctionList = Lists.newArrayList();
-    for (int i = 0; i < squashingFunctionSize; ++i) {
-      this.squashingFunctionList.add(FunctionFactory
-          .createDoubleFunction(WritableUtils.readString(input)));
-    }
-
-    // read weights and construct matrices of previous updates
-    int numOfMatrices = input.readInt();
-    this.weightMatrixList = Lists.newArrayList();
-    this.prevWeightUpdatesList = Lists.newArrayList();
-    for (int i = 0; i < numOfMatrices; ++i) {
-      DoubleMatrix matrix = MatrixWritable.read(input);
-      this.weightMatrixList.add(matrix);
-      this.prevWeightUpdatesList.add(new DenseDoubleMatrix(
-          matrix.getRowCount(), matrix.getColumnCount()));
-    }
-
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    super.write(output);
-
-    // write squashing functions
-    output.writeInt(this.squashingFunctionList.size());
-    for (DoubleFunction aSquashingFunctionList : this.squashingFunctionList) {
-      WritableUtils.writeString(output,
-          aSquashingFunctionList.getFunctionName());
-    }
-
-    // write weight matrices
-    output.writeInt(this.weightMatrixList.size());
-    for (DoubleMatrix aWeightMatrixList : this.weightMatrixList) {
-      MatrixWritable.write(aWeightMatrixList, output);
-    }
-
-    // DO NOT WRITE WEIGHT UPDATE
-  }
-
-  @Override
-  public DoubleMatrix getWeightsByLayer(int layerIdx) {
-    return this.weightMatrixList.get(layerIdx);
-  }
-
-  /**
-   * Get the output of the model according to given feature instance.
-   */
-  @Override
-  public DoubleVector getOutput(DoubleVector instance) {
-    Preconditions.checkArgument(this.layerSizeList.get(0) - 1 == instance
-        .getDimension(), String.format(
-        "The dimension of input instance should be %d.",
-        this.layerSizeList.get(0) - 1));
-    // transform the features to another space
-    DoubleVector transformedInstance = this.featureTransformer
-        .transform(instance);
-    // add bias feature
-    DoubleVector instanceWithBias = new DenseDoubleVector(
-        transformedInstance.getDimension() + 1);
-    instanceWithBias.set(0, 0.99999); // set bias to be a little bit less than
-                                      // 1.0
-    for (int i = 1; i < instanceWithBias.getDimension(); ++i) {
-      instanceWithBias.set(i, transformedInstance.get(i - 1));
-    }
-
-    List<DoubleVector> outputCache = getOutputInternal(instanceWithBias);
-    // return the output of the last layer
-    DoubleVector result = outputCache.get(outputCache.size() - 1);
-    // remove bias
-    return result.sliceUnsafe(1, result.getDimension() - 1);
-  }
-
-  /**
-   * Calculate output internally, the intermediate output of each layer will be
-   * stored.
-   * 
-   * @param instanceWithBias The instance contains the features.
-   * @return Cached output of each layer.
-   */
-  public List<DoubleVector> getOutputInternal(DoubleVector instanceWithBias) {
-    List<DoubleVector> outputCache = new ArrayList<DoubleVector>();
-    // fill with instance
-    DoubleVector intermediateOutput = instanceWithBias;
-    outputCache.add(intermediateOutput);
-
-    for (int i = 0; i < this.layerSizeList.size() - 1; ++i) {
-      intermediateOutput = forward(i, intermediateOutput);
-      outputCache.add(intermediateOutput);
-    }
-
-    return outputCache;
-  }
-
-  /**
-   * @return a new neuron instance
-   */
-  public static Neuron<Synapse<DoubleWritable, DoubleWritable>> newNeuronInstance() {
-    return (Neuron<Synapse<DoubleWritable, DoubleWritable>>) ReflectionUtils
-        .newInstance(neuronClass);
-  }
-
-  /**
-   * Forward the calculation for one layer.
-   * 
-   * @param fromLayer The index of the previous layer.
-   * @param intermediateOutput The intermediateOutput of previous layer.
-   * @return a new vector with the result of the operation.
-   */
-  protected DoubleVector forward(int fromLayer, DoubleVector intermediateOutput) {
-    DoubleMatrix weightMatrix = this.weightMatrixList.get(fromLayer);
-
-    neuronClass = (Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>>) conf
-        .getClass("neuron.class", Neuron.class);
-
-    // TODO use the multithread processing
-    DoubleVector vec = new DenseDoubleVector(weightMatrix.getRowCount());
-    for (int row = 0; row < weightMatrix.getRowCount(); row++) {
-      List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
-      for (int col = 0; col < weightMatrix.getColumnCount(); col++) {
-        msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
-            new DoubleWritable(intermediateOutput.get(col)),
-            new DoubleWritable(weightMatrix.get(row, col))));
-      }
-      Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
-      Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
-      n.setup(conf);
-      n.setSquashingFunction(this.squashingFunctionList.get(fromLayer));
-      try {
-        n.forward(iterable);
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-      vec.set(row, n.getOutput());
-    }
-
-    // add bias
-    DoubleVector vecWithBias = new DenseDoubleVector(vec.getDimension() + 1);
-    vecWithBias.set(0, 1);
-    for (int i = 0; i < vec.getDimension(); ++i) {
-      vecWithBias.set(i + 1, vec.get(i));
-    }
-
-    return vecWithBias;
-  }
-
-  /**
-   * Train the model online.
-   * 
-   * @param trainingInstance
-   */
-  public void trainOnline(DoubleVector trainingInstance) {
-    DoubleMatrix[] updateMatrices = this.trainByInstance(trainingInstance);
-    this.updateWeightMatrices(updateMatrices);
-  }
-
-  @Override
-  public DoubleMatrix[] trainByInstance(DoubleVector trainingInstance) {
-    DoubleVector transformedVector = this.featureTransformer
-        .transform(trainingInstance.sliceUnsafe(this.layerSizeList.get(0) - 1));
-
-    int inputDimension = this.layerSizeList.get(0) - 1;
-    int outputDimension;
-    DoubleVector inputInstance = null;
-    DoubleVector labels = null;
-    if (this.learningStyle == LearningStyle.SUPERVISED) {
-      outputDimension = this.layerSizeList.get(this.layerSizeList.size() - 1);
-      // validate training instance
-      Preconditions.checkArgument(
-          inputDimension + outputDimension == trainingInstance.getDimension(),
-          String
-              .format(
-                  "The dimension of training instance is %d, but requires %d.",
-                  trainingInstance.getDimension(), inputDimension
-                      + outputDimension));
-
-      inputInstance = new DenseDoubleVector(this.layerSizeList.get(0));
-      inputInstance.set(0, 1); // add bias
-      // get the features from the transformed vector
-      for (int i = 0; i < inputDimension; ++i) {
-        inputInstance.set(i + 1, transformedVector.get(i));
-      }
-      // get the labels from the original training instance
-      labels = trainingInstance.sliceUnsafe(inputInstance.getDimension() - 1,
-          trainingInstance.getDimension() - 1);
-    } else if (this.learningStyle == LearningStyle.UNSUPERVISED) {
-      // labels are identical to input features
-      outputDimension = inputDimension;
-      // validate training instance
-      Preconditions.checkArgument(inputDimension == trainingInstance
-          .getDimension(), String.format(
-          "The dimension of training instance is %d, but requires %d.",
-          trainingInstance.getDimension(), inputDimension));
-
-      inputInstance = new DenseDoubleVector(this.layerSizeList.get(0));
-      inputInstance.set(0, 1); // add bias
-      // get the features from the transformed vector
-      for (int i = 0; i < inputDimension; ++i) {
-        inputInstance.set(i + 1, transformedVector.get(i));
-      }
-      // get the labels by copying the transformed vector
-      labels = transformedVector.deepCopy();
-    }
-
-    List<DoubleVector> internalResults = this.getOutputInternal(inputInstance);
-    DoubleVector output = internalResults.get(internalResults.size() - 1);
-
-    // get the training error
-    calculateTrainingError(labels,
-        output.deepCopy().sliceUnsafe(1, output.getDimension() - 1));
-
-    if (this.trainingMethod.equals(TrainingMethod.GRADIENT_DESCENT)) {
-      return this.trainByInstanceGradientDescent(labels, internalResults);
-    } else {
-      throw new IllegalArgumentException(
-          String.format("Training method is not supported."));
-    }
-  }
-
-  /**
-   * Train by gradient descent. Get the updated weights using one training
-   * instance.
-   * 
-   * @param trainingInstance
-   * @return The weight update matrices.
-   */
-  private DoubleMatrix[] trainByInstanceGradientDescent(DoubleVector labels,
-      List<DoubleVector> internalResults) {
-
-    DoubleVector output = internalResults.get(internalResults.size() - 1);
-    // initialize weight update matrices
-    DenseDoubleMatrix[] weightUpdateMatrices = new DenseDoubleMatrix[this.weightMatrixList
-        .size()];
-    for (int m = 0; m < weightUpdateMatrices.length; ++m) {
-      weightUpdateMatrices[m] = new DenseDoubleMatrix(this.weightMatrixList
-          .get(m).getRowCount(), this.weightMatrixList.get(m).getColumnCount());
-    }
-    DoubleVector deltaVec = new DenseDoubleVector(
-        this.layerSizeList.get(this.layerSizeList.size() - 1));
-
-    DoubleFunction squashingFunction = this.squashingFunctionList
-        .get(this.squashingFunctionList.size() - 1);
-
-    DoubleMatrix lastWeightMatrix = this.weightMatrixList
-        .get(this.weightMatrixList.size() - 1);
-    for (int i = 0; i < deltaVec.getDimension(); ++i) {
-      double costFuncDerivative = this.costFunction.applyDerivative(
-          labels.get(i), output.get(i + 1));
-      // add regularization
-      costFuncDerivative += this.regularizationWeight
-          * lastWeightMatrix.getRowVector(i).sum();
-      deltaVec.set(
-          i,
-          costFuncDerivative
-              * squashingFunction.applyDerivative(output.get(i + 1)));
-    }
-
-    // start from previous layer of output layer
-    for (int layer = this.layerSizeList.size() - 2; layer >= 0; --layer) {
-      output = internalResults.get(layer);
-      deltaVec = backpropagate(layer, deltaVec, internalResults,
-          weightUpdateMatrices[layer]);
-    }
-
-    this.setPrevWeightMatrices(weightUpdateMatrices);
-
-    return weightUpdateMatrices;
-  }
-
-  /**
-   * Back-propagate the errors to from next layer to current layer. The weight
-   * updated information will be stored in the weightUpdateMatrices, and the
-   * delta of the prevLayer would be returned.
-   * 
-   * @param layer Index of current layer.
-   * @param internalOutput Internal output of current layer.
-   * @param deltaVec Delta of next layer.
-   * @return the squashing function of the specified position.
-   */
-  private DoubleVector backpropagate(int curLayerIdx,
-      DoubleVector nextLayerDelta, List<DoubleVector> outputCache,
-      DenseDoubleMatrix weightUpdateMatrix) {
-
-    // get layer related information
-    DoubleVector curLayerOutput = outputCache.get(curLayerIdx);
-    DoubleMatrix weightMatrix = this.weightMatrixList.get(curLayerIdx);
-    DoubleMatrix prevWeightMatrix = this.prevWeightUpdatesList.get(curLayerIdx);
-
-    // next layer is not output layer, remove the delta of bias neuron
-    if (curLayerIdx != this.layerSizeList.size() - 2) {
-      nextLayerDelta = nextLayerDelta.slice(1,
-          nextLayerDelta.getDimension() - 1);
-    }
-
-    // DoubleMatrix transposed = weightMatrix.transpose();
-    DoubleVector deltaVector = new DenseDoubleVector(
-        weightMatrix.getColumnCount());
-    for (int row = 0; row < weightMatrix.getColumnCount(); ++row) {
-      Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
-      // calls setup method
-      n.setup(conf);
-      n.setSquashingFunction(this.squashingFunctionList.get(curLayerIdx));
-      n.setOutput(curLayerOutput.get(row));
-
-      List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
-
-      n.setWeightVector(weightMatrix.getRowCount());
-
-      for (int col = 0; col < weightMatrix.getRowCount(); ++col) {
-        // sum += (transposed.get(row, col) * nextLayerDelta.get(col));
-        msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
-            new DoubleWritable(nextLayerDelta.get(col)), new DoubleWritable(
-                weightMatrix.get(col, row)), new DoubleWritable(
-                prevWeightMatrix.get(col, row))));
-      }
-
-      Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
-      try {
-        n.backward(iterable);
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-
-      // update weights
-      weightUpdateMatrix.setColumn(row, n.getWeights());
-      deltaVector.set(row, n.getDelta());
-    }
-
-    return deltaVector;
-  }
-
-  @Override
-  protected BSPJob trainInternal(HamaConfiguration hamaConf)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    this.conf = hamaConf;
-    this.fs = FileSystem.get(conf);
-
-    String modelPath = conf.get("model.path");
-    if (modelPath != null) {
-      this.modelPath = modelPath;
-    }
-    // modelPath must be set before training
-    if (this.modelPath == null) {
-      throw new IllegalArgumentException(
-          "Please specify the modelPath for model, "
-              + "either through setModelPath() or add 'modelPath' to the training parameters.");
-    }
-    this.writeModelToFile();
-
-    // create job
-    BSPJob job = new BSPJob(conf, SmallLayeredNeuralNetworkTrainer.class);
-    job.setJobName("Small scale Neural Network training");
-    job.setJarByClass(SmallLayeredNeuralNetworkTrainer.class);
-    job.setBspClass(SmallLayeredNeuralNetworkTrainer.class);
-
-    job.getConfiguration().setClass("neuron.class", StandardNeuron.class,
-        Neuron.class);
-
-    // additional for parameter server
-    // TODO at this moment, we use 1 task as a parameter server
-    // In the future, the number of parameter server should be configurable
-    job.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
-
-    job.setInputPath(new Path(conf.get("training.input.path")));
-    job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
-    job.setInputKeyClass(LongWritable.class);
-    job.setInputValueClass(VectorWritable.class);
-    job.setOutputKeyClass(NullWritable.class);
-    job.setOutputValueClass(NullWritable.class);
-    job.setOutputFormat(org.apache.hama.bsp.NullOutputFormat.class);
-
-    return job;
-  }
-
-  @Override
-  protected void calculateTrainingError(DoubleVector labels, DoubleVector output) {
-    DoubleVector errors = labels.deepCopy().applyToElements(output,
-        this.costFunction);
-    this.trainingError = errors.sum();
-  }
-
-  /**
-   * Get the squashing function of a specified layer.
-   * 
-   * @param idx
-   * @return a new vector with the result of the operation.
-   */
-  public DoubleFunction getSquashingFunction(int idx) {
-    return this.squashingFunctionList.get(idx);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java
deleted file mode 100644
index 2f8c287..0000000
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.commons.io.MatrixWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DoubleMatrix;
-
-/**
- * NeuralNetworkMessage transmits the messages between peers during the training
- * of neural networks.
- * 
- */
-public class SmallLayeredNeuralNetworkMessage implements Writable {
-
-  protected double trainingError;
-  protected DoubleMatrix[] curMatrices;
-  protected DoubleMatrix[] prevMatrices;
-  protected boolean converge;
-
-  public SmallLayeredNeuralNetworkMessage() {
-  }
-  
-  public SmallLayeredNeuralNetworkMessage(double trainingError,
-      boolean converge, DoubleMatrix[] weightMatrices,
-      DoubleMatrix[] prevMatrices) {
-    this.trainingError = trainingError;
-    this.converge = converge;
-    this.curMatrices = weightMatrices;
-    this.prevMatrices = prevMatrices;
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    trainingError = input.readDouble();
-    converge = input.readBoolean();
-    int numMatrices = input.readInt();
-    boolean hasPrevMatrices = input.readBoolean();
-    curMatrices = new DenseDoubleMatrix[numMatrices];
-    // read matrice updates
-    for (int i = 0; i < curMatrices.length; ++i) {
-      curMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
-    }
-
-    if (hasPrevMatrices) {
-      prevMatrices = new DenseDoubleMatrix[numMatrices];
-      // read previous matrices updates
-      for (int i = 0; i < prevMatrices.length; ++i) {
-        prevMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
-      }
-    }
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    output.writeDouble(trainingError);
-    output.writeBoolean(converge);
-    output.writeInt(curMatrices.length);
-    if (prevMatrices == null) {
-      output.writeBoolean(false);
-    } else {
-      output.writeBoolean(true);
-    }
-    for (DoubleMatrix matrix : curMatrices) {
-      MatrixWritable.write(matrix, output);
-    }
-    if (prevMatrices != null) {
-      for (DoubleMatrix matrix : prevMatrices) {
-        MatrixWritable.write(matrix, output);
-      }
-    }
-  }
-
-  public double getTrainingError() {
-    return trainingError;
-  }
-
-  public void setTrainingError(double trainingError) {
-    this.trainingError = trainingError;
-  }
-
-  public boolean isConverge() {
-    return converge;
-  }
-
-  public void setConverge(boolean converge) {
-    this.converge = converge;
-  }
-
-  public DoubleMatrix[] getCurMatrices() {
-    return curMatrices;
-  }
-
-  public void setMatrices(DoubleMatrix[] curMatrices) {
-    this.curMatrices = curMatrices;
-  }
-
-  public DoubleMatrix[] getPrevMatrices() {
-    return prevMatrices;
-  }
-
-  public void setPrevMatrices(DoubleMatrix[] prevMatrices) {
-    this.prevMatrices = prevMatrices;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
deleted file mode 100644
index c3e258c..0000000
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.ipc.RPC;
-
-import com.google.common.base.Preconditions;
-
-/**
- * The trainer that train the {@link SmallLayeredNeuralNetwork} based on BSP
- * framework.
- * 
- */
-public final class SmallLayeredNeuralNetworkTrainer
-    extends
-    BSP<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> {
-
-  private static final Log LOG = LogFactory
-      .getLog(SmallLayeredNeuralNetworkTrainer.class);
-
-  /* When given peer is master worker: base of parameter merge */
-  /* When given peer is slave worker: neural network for training */
-  private SmallLayeredNeuralNetwork inMemoryModel;
-
-  /* Job configuration */
-  private HamaConfiguration conf;
-
-  /* Default batch size */
-  private int batchSize;
-
-  /* whether it is converging or not */
-  private AtomicBoolean isConverge;
-
-  /* When given peer is master worker: Asynchronous parameter merger */
-  /* When given peer is slave worker: null */
-  private RPC.Server merger;
-
-  /* When given peer is master worker: null */
-  /* When given peer is slave worker: proxy to Asynchronous parameter merger */
-  private ParameterMerger proxy;
-
-  /**
-   * Returns true if this worker is master worker.
-   *
-   * @param peer
-   * */
-  private boolean isMaster(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
-    return peer.getPeerIndex() == peer.getNumPeers() - 1;
-  }
-
-  @Override
-  /**
-   * If the model path is specified, load the existing from storage location.
-   */
-  public void setup(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
-    // At least one master & slave worker exist.
-    Preconditions.checkArgument(peer.getNumPeers() >= 2);
-    this.conf = peer.getConfiguration();
-
-    String modelPath = conf.get("model.path");
-    this.inMemoryModel = new SmallLayeredNeuralNetwork(conf, modelPath);
-
-    this.batchSize = conf.getInt("training.batch.size", 50);
-    this.isConverge = new AtomicBoolean(false);
-
-    int slaveCount = peer.getNumPeers() - 1;
-    int mergeLimit = conf.getInt("training.max.iterations", 100000);
-    int convergenceCheckInterval = peer.getNumPeers()
-        * conf.getInt("convergence.check.interval", 2000);
-    String master = peer.getPeerName();
-    String masterAddr = master.substring(0, master.indexOf(':'));
-    int port = conf.getInt("sync.server.port", 40052);
-
-    if (isMaster(peer)) {
-      try {
-        this.merger = RPC.getServer(new ParameterMergerServer(inMemoryModel,
-            isConverge, slaveCount, mergeLimit, convergenceCheckInterval),
-            masterAddr, port, conf);
-        merger.start();
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-      LOG.info("Begin to train");
-    } else {
-      InetSocketAddress addr = new InetSocketAddress(masterAddr, port);
-      try {
-        this.proxy = (ParameterMerger) RPC.getProxy(ParameterMerger.class,
-            ParameterMerger.versionID, addr, conf);
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  @Override
-  /**
-   * Write the trained model back to stored location.
-   */
-  public void cleanup(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
-    // write model to modelPath
-    if (isMaster(peer)) {
-      try {
-        LOG.info("Write model back to " + inMemoryModel.getModelPath());
-        this.inMemoryModel.writeModelToFile();
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  @Override
-  public void bsp(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer)
-      throws IOException, SyncException, InterruptedException {
-    while (!this.isConverge.get()) {
-      // each slave-worker calculate the matrices updates according to local
-      // data
-      // and merge them with master
-      if (!isMaster(peer)) {
-        calculateUpdates(peer);
-      }
-    }
-
-    if (isMaster(peer)) {
-      merger.stop();
-    }
-    peer.sync(); // finalize the bsp program.
-  }
-
-  /**
-   * Calculate the matrices updates according to local partition of data.
-   * 
-   * @param peer
-   * @throws IOException
-   */
-  private void calculateUpdates(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer)
-      throws IOException {
-
-    DoubleMatrix[] weightUpdates = new DoubleMatrix[this.inMemoryModel.weightMatrixList
-        .size()];
-    for (int i = 0; i < weightUpdates.length; ++i) {
-      int row = this.inMemoryModel.weightMatrixList.get(i).getRowCount();
-      int col = this.inMemoryModel.weightMatrixList.get(i).getColumnCount();
-      weightUpdates[i] = new DenseDoubleMatrix(row, col);
-    }
-
-    // continue to train
-    double avgTrainingError = 0.0;
-    LongWritable key = new LongWritable();
-    VectorWritable value = new VectorWritable();
-    for (int recordsRead = 0; recordsRead < batchSize; ++recordsRead) {
-      if (!peer.readNext(key, value)) {
-        peer.reopenInput();
-        peer.readNext(key, value);
-      }
-      DoubleVector trainingInstance = value.getVector();
-      SmallLayeredNeuralNetwork.matricesAdd(weightUpdates,
-          this.inMemoryModel.trainByInstance(trainingInstance));
-      avgTrainingError += this.inMemoryModel.trainingError;
-    }
-    avgTrainingError /= batchSize;
-
-    // calculate the average of updates
-    for (int i = 0; i < weightUpdates.length; ++i) {
-      weightUpdates[i] = weightUpdates[i].divide(batchSize);
-    }
-
-    // exchange parameter update with master
-    SmallLayeredNeuralNetworkMessage msg = new SmallLayeredNeuralNetworkMessage(
-        avgTrainingError, false, weightUpdates,
-        this.inMemoryModel.getPrevMatricesUpdates());
-
-    SmallLayeredNeuralNetworkMessage inMessage = proxy.merge(msg);
-    DoubleMatrix[] newWeights = inMessage.getCurMatrices();
-    DoubleMatrix[] preWeightUpdates = inMessage.getPrevMatrices();
-    this.inMemoryModel.setWeightMatrices(newWeights);
-    this.inMemoryModel.setPrevWeightMatrices(preWeightUpdates);
-    this.isConverge.set(inMessage.isConverge());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/Synapse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/Synapse.java b/src/main/java/org/apache/horn/bsp/Synapse.java
deleted file mode 100644
index 61725f9..0000000
--- a/src/main/java/org/apache/horn/bsp/Synapse.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Message wrapper for a propagating message
- */
-public class Synapse<M extends Writable, W extends Writable> implements
-    Writable {
-
-  DoubleWritable message;
-  DoubleWritable weight;
-  DoubleWritable prevWeight;
-
-  public Synapse(DoubleWritable message, DoubleWritable weight) {
-    this.message = message;
-    this.weight = weight;
-  }
-
-  public Synapse(DoubleWritable message, DoubleWritable weight, DoubleWritable prevWeight) {
-    this.message = message;
-    this.weight = weight;
-    this.prevWeight = prevWeight;
-  }
-  
-  /**
-   * @return the activation or error message
-   */
-  public double getMessage() {
-    return message.get();
-  }
-
-  public double getInput() {
-    // returns the input
-    return message.get();
-  }
-  
-  public double getDelta() {
-    // returns the delta
-    return message.get();
-  }
-  
-  public double getWeight() {
-    return weight.get();
-  }
-  
-  public double getPrevWeight() {
-    return prevWeight.get();
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    message.readFields(in);
-    weight.readFields(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    message.write(out);
-    weight.write(out);
-  }
-
-}
\ No newline at end of file


[2/4] incubator-horn git commit: Code refactoring

Posted by ed...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/examples/NeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/examples/NeuralNetwork.java b/src/main/java/org/apache/horn/examples/NeuralNetwork.java
deleted file mode 100644
index 5c0afdf..0000000
--- a/src/main/java/org/apache/horn/examples/NeuralNetwork.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.examples;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.horn.bsp.SmallLayeredNeuralNetwork;
-import org.apache.horn.funcs.FunctionFactory;
-
-/**
- * The example of using {@link SmallLayeredNeuralNetwork}, including the
- * training phase and labeling phase.
- */
-public class NeuralNetwork {
-
-  public static void main(String[] args) throws Exception {
-    if (args.length < 3) {
-      printUsage();
-      return;
-    }
-    HamaConfiguration conf = new HamaConfiguration();
-    String mode = args[0];
-    
-    if (mode.equalsIgnoreCase("label")) {
-      if (args.length < 4) {
-        printUsage();
-        return;
-      }
-
-      String featureDataPath = args[1];
-      String resultDataPath = args[2];
-      String modelPath = args[3];
-
-      SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork(conf, modelPath);
-
-      // process data in streaming approach
-      FileSystem fs = FileSystem.get(new URI(featureDataPath), conf);
-      BufferedReader br = new BufferedReader(new InputStreamReader(
-          fs.open(new Path(featureDataPath))));
-      Path outputPath = new Path(resultDataPath);
-      if (fs.exists(outputPath)) {
-        fs.delete(outputPath, true);
-      }
-      BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(
-          fs.create(outputPath)));
-
-      String line = null;
-
-      while ((line = br.readLine()) != null) {
-        if (line.trim().length() == 0) {
-          continue;
-        }
-        String[] tokens = line.trim().split(",");
-        double[] vals = new double[tokens.length];
-        for (int i = 0; i < tokens.length; ++i) {
-          vals[i] = Double.parseDouble(tokens[i]);
-        }
-        DoubleVector instance = new DenseDoubleVector(vals);
-        DoubleVector result = ann.getOutput(instance);
-        double[] arrResult = result.toArray();
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < arrResult.length; ++i) {
-          sb.append(arrResult[i]);
-          if (i != arrResult.length - 1) {
-            sb.append(",");
-          } else {
-            sb.append("\n");
-          }
-        }
-        bw.write(sb.toString());
-      }
-
-      br.close();
-      bw.close();
-    } else if (mode.equals("train")) {
-      if (args.length < 5) {
-        printUsage();
-        return;
-      }
-
-      String trainingDataPath = args[1];
-      String trainedModelPath = args[2];
-
-      int featureDimension = Integer.parseInt(args[3]);
-      int labelDimension = Integer.parseInt(args[4]);
-
-      int iteration = 1000;
-      double learningRate = 0.4;
-      double momemtumWeight = 0.2;
-      double regularizationWeight = 0.01;
-
-      // parse parameters
-      if (args.length >= 6) {
-        try {
-          iteration = Integer.parseInt(args[5]);
-          System.out.printf("Iteration: %d\n", iteration);
-        } catch (NumberFormatException e) {
-          System.err
-              .println("MAX_ITERATION format invalid. It should be a positive number.");
-          return;
-        }
-      }
-      if (args.length >= 7) {
-        try {
-          learningRate = Double.parseDouble(args[6]);
-          System.out.printf("Learning rate: %f\n", learningRate);
-        } catch (NumberFormatException e) {
-          System.err
-              .println("LEARNING_RATE format invalid. It should be a positive double in range (0, 1.0)");
-          return;
-        }
-      }
-      if (args.length >= 8) {
-        try {
-          momemtumWeight = Double.parseDouble(args[7]);
-          System.out.printf("Momemtum weight: %f\n", momemtumWeight);
-        } catch (NumberFormatException e) {
-          System.err
-              .println("MOMEMTUM_WEIGHT format invalid. It should be a positive double in range (0, 1.0)");
-          return;
-        }
-      }
-      if (args.length >= 9) {
-        try {
-          regularizationWeight = Double.parseDouble(args[8]);
-          System.out
-              .printf("Regularization weight: %f\n", regularizationWeight);
-        } catch (NumberFormatException e) {
-          System.err
-              .println("REGULARIZATION_WEIGHT format invalid. It should be a positive double in range (0, 1.0)");
-          return;
-        }
-      }
-
-      // train the model
-      SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-      // ann.setLearningRate(learningRate);
-      // ann.setMomemtumWeight(momemtumWeight);
-      // ann.setRegularizationWeight(regularizationWeight);
-      ann.addLayer(featureDimension, false,
-          FunctionFactory.createDoubleFunction("Sigmoid"));
-      ann.addLayer(featureDimension, false,
-          FunctionFactory.createDoubleFunction("Sigmoid"));
-      ann.addLayer(labelDimension, true,
-          FunctionFactory.createDoubleFunction("Sigmoid"));
-      ann.setCostFunction(FunctionFactory
-          .createDoubleDoubleFunction("CrossEntropy"));
-      ann.setModelPath(trainedModelPath);
-
-      Map<String, String> trainingParameters = new HashMap<String, String>();
-      trainingParameters.put("tasks", "2");
-      trainingParameters.put("training.max.iterations", "" + iteration);
-      trainingParameters.put("training.batch.size", "300");
-      trainingParameters.put("convergence.check.interval", "1000");
-      // ann.train(conf, new Path(trainingDataPath), trainingParameters);
-    }
-
-  }
-
-  private static void printUsage() {
-    System.out
-        .println("USAGE: <MODE> <INPUT_PATH> <OUTPUT_PATH> <MODEL_PATH>|<FEATURE_DIMENSION> <LABEL_DIMENSION> [<MAX_ITERATION> <LEARNING_RATE> <MOMEMTUM_WEIGHT> <REGULARIZATION_WEIGHT>]");
-    System.out
-        .println("\tMODE\t- train: train the model with given training data.");
-    System.out
-        .println("\t\t- label: obtain the result by feeding the features to the neural network.");
-    System.out
-        .println("\tINPUT_PATH\tin 'train' mode, it is the path of the training data; in 'label' mode, it is the path of the to be evaluated data that lacks the label.");
-    System.out
-        .println("\tOUTPUT_PATH\tin 'train' mode, it is where the trained model is stored; in 'label' mode, it is where the labeled data is stored.");
-    System.out.println("\n\tConditional Parameters:");
-    System.out
-        .println("\tMODEL_PATH\tonly required in 'label' mode. It specifies where to load the trained neural network model.");
-    System.out
-        .println("\tMAX_ITERATION\tonly used in 'train' mode. It specifies how many iterations for the neural network to run. Default is 0.01.");
-    System.out
-        .println("\tLEARNING_RATE\tonly used to 'train' mode. It specifies the degree of aggregation for learning, usually in range (0, 1.0). Default is 0.1.");
-    System.out
-        .println("\tMOMEMTUM_WEIGHT\tonly used to 'train' mode. It specifies the weight of momemtum. Default is 0.");
-    System.out
-        .println("\tREGULARIZATION_WEIGHT\tonly required in 'train' model. It specifies the weight of reqularization.");
-    System.out.println("\nExample:");
-    System.out
-        .println("Train a neural network with with feature dimension 8, label dimension 1 and default setting:\n\tneuralnets train hdfs://localhost:30002/training_data hdfs://localhost:30002/model 8 1");
-    System.out
-        .println("Train a neural network with with feature dimension 8, label dimension 1 and specify learning rate as 0.1, momemtum rate as 0.2, and regularization weight as 0.01:\n\tneuralnets.train hdfs://localhost:30002/training_data hdfs://localhost:30002/model 8 1 0.1 0.2 0.01");
-    System.out
-        .println("Label the data with trained model:\n\tneuralnets evaluate hdfs://localhost:30002/unlabeled_data hdfs://localhost:30002/result hdfs://localhost:30002/model");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/bsp/MLTestBase.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/bsp/MLTestBase.java b/src/test/java/org/apache/horn/bsp/MLTestBase.java
deleted file mode 100644
index 8001bcf..0000000
--- a/src/test/java/org/apache/horn/bsp/MLTestBase.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * The common methods for testing machine learning algorithms
- *
- */
-public abstract class MLTestBase {
-
-  /**
-   * Conduct the 0-1 normalization.
-   * 
-   * @param instances
-   */
-  protected static void zeroOneNormalization(List<double[]> instanceList,
-      int len) {
-    int dimension = len;
-
-    double[] mins = new double[dimension];
-    double[] maxs = new double[dimension];
-    Arrays.fill(mins, Double.MAX_VALUE);
-    Arrays.fill(maxs, Double.MIN_VALUE);
-
-    for (double[] instance : instanceList) {
-      for (int i = 0; i < len; ++i) {
-        if (mins[i] > instance[i]) {
-          mins[i] = instance[i];
-        }
-        if (maxs[i] < instance[i]) {
-          maxs[i] = instance[i];
-        }
-      }
-    }
-
-    for (double[] instance : instanceList) {
-      for (int i = 0; i < len; ++i) {
-        double range = maxs[i] - mins[i];
-        if (range != 0) {
-          instance[i] = (instance[i] - mins[i]) / range;
-        }
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/bsp/TestAutoEncoder.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/bsp/TestAutoEncoder.java b/src/test/java/org/apache/horn/bsp/TestAutoEncoder.java
deleted file mode 100644
index a42fd72..0000000
--- a/src/test/java/org/apache/horn/bsp/TestAutoEncoder.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleVector;
-import org.junit.Test;
-import org.mortbay.log.Log;
-
-/**
- * Test the functionality of {@link AutoEncoder}.
- * 
- */
-public class TestAutoEncoder extends MLTestBase {
-
-  @Test
-  public void testAutoEncoderSimple() {
-    double[][] instances = { { 0, 0, 0, 1 }, { 0, 0, 1, 0 }, { 0, 1, 0, 0 },
-        { 0, 0, 0, 0 } };
-    AutoEncoder encoder = new AutoEncoder(4, 2);
-    // TODO use the configuration
-
-    // encoder.setLearningRate(0.5);
-    // encoder.setMomemtumWeight(0.2);
-
-    int maxIteration = 2000;
-    Random rnd = new Random();
-    for (int iteration = 0; iteration < maxIteration; ++iteration) {
-      for (int i = 0; i < instances.length; ++i) {
-        encoder.trainOnline(new DenseDoubleVector(instances[rnd
-            .nextInt(instances.length)]));
-      }
-    }
-
-    for (int i = 0; i < instances.length; ++i) {
-      DoubleVector encodeVec = encoder.encode(new DenseDoubleVector(
-          instances[i]));
-      DoubleVector decodeVec = encoder.decode(encodeVec);
-      for (int d = 0; d < instances[i].length; ++d) {
-        assertEquals(instances[i][d], decodeVec.get(d), 0.1);
-      }
-    }
-
-  }
-
-  @Test
-  public void testAutoEncoderSwissRollDataset() {
-    List<double[]> instanceList = new ArrayList<double[]>();
-    try {
-      BufferedReader br = new BufferedReader(new FileReader(
-          "src/test/resources/dimensional_reduction.txt"));
-      String line = null;
-      while ((line = br.readLine()) != null) {
-        String[] tokens = line.split("\t");
-        double[] instance = new double[tokens.length];
-        for (int i = 0; i < instance.length; ++i) {
-          instance[i] = Double.parseDouble(tokens[i]);
-        }
-        instanceList.add(instance);
-      }
-      br.close();
-      // normalize instances
-      zeroOneNormalization(instanceList, instanceList.get(0).length);
-    } catch (FileNotFoundException e) {
-      e.printStackTrace();
-    } catch (NumberFormatException e) {
-      e.printStackTrace();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    List<DoubleVector> vecInstanceList = new ArrayList<DoubleVector>();
-    for (double[] instance : instanceList) {
-      vecInstanceList.add(new DenseDoubleVector(instance));
-    }
-    AutoEncoder encoder = new AutoEncoder(3, 2);
-    // encoder.setLearningRate(0.05);
-    // encoder.setMomemtumWeight(0.1);
-    int maxIteration = 2000;
-    for (int iteration = 0; iteration < maxIteration; ++iteration) {
-      for (DoubleVector vector : vecInstanceList) {
-        encoder.trainOnline(vector);
-      }
-    }
-
-    double errorInstance = 0;
-    for (DoubleVector vector : vecInstanceList) {
-      DoubleVector decoded = encoder.getOutput(vector);
-      DoubleVector diff = vector.subtract(decoded);
-      double error = diff.dot(diff);
-      if (error > 0.1) {
-        ++errorInstance;
-      }
-    }
-    Log.info(String.format("Autoecoder error rate: %f%%\n", errorInstance * 100
-        / vecInstanceList.size()));
-
-  }
-
-  @Test
-  public void testAutoEncoderSwissRollDatasetDistributed() {
-    HamaConfiguration conf = new HamaConfiguration();
-    String strDataPath = "/tmp/dimensional_reduction.txt";
-    Path path = new Path(strDataPath);
-    List<double[]> instanceList = new ArrayList<double[]>();
-    try {
-      FileSystem fs = FileSystem.get(new URI(strDataPath), conf);
-      if (fs.exists(path)) {
-        fs.delete(path, true);
-      }
-
-      String line = null;
-      BufferedReader br = new BufferedReader(new FileReader(
-          "src/test/resources/dimensional_reduction.txt"));
-      while ((line = br.readLine()) != null) {
-        String[] tokens = line.split("\t");
-        double[] instance = new double[tokens.length];
-        for (int i = 0; i < instance.length; ++i) {
-          instance[i] = Double.parseDouble(tokens[i]);
-        }
-        instanceList.add(instance);
-      }
-      br.close();
-      // normalize instances
-      zeroOneNormalization(instanceList, instanceList.get(0).length);
-
-      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
-          LongWritable.class, VectorWritable.class);
-      for (int i = 0; i < instanceList.size(); ++i) {
-        DoubleVector vector = new DenseDoubleVector(instanceList.get(i));
-        writer.append(new LongWritable(i), new VectorWritable(vector));
-      }
-
-      writer.close();
-    } catch (FileNotFoundException e) {
-      e.printStackTrace();
-    } catch (IOException e) {
-      e.printStackTrace();
-    } catch (URISyntaxException e) {
-      e.printStackTrace();
-    }
-
-    AutoEncoder encoder = new AutoEncoder(3, 2);
-    String modelPath = "/tmp/autoencoder-modelpath";
-    encoder.setModelPath(modelPath);
-    Map<String, String> trainingParams = new HashMap<String, String>();
-    // encoder.setLearningRate(0.5);
-    trainingParams.put("tasks", "5");
-    trainingParams.put("training.max.iterations", "3000");
-    trainingParams.put("training.batch.size", "200");
-    // encoder.train(conf, path, trainingParams);
-
-    double errorInstance = 0;
-    for (double[] instance : instanceList) {
-      DoubleVector vector = new DenseDoubleVector(instance);
-      DoubleVector decoded = encoder.getOutput(vector);
-      DoubleVector diff = vector.subtract(decoded);
-      double error = diff.dot(diff);
-      if (error > 0.1) {
-        ++errorInstance;
-      }
-    }
-    Log.info(String.format("Autoecoder error rate: %f%%\n", errorInstance * 100
-        / instanceList.size()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetwork.java b/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetwork.java
deleted file mode 100644
index ee48136..0000000
--- a/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetwork.java
+++ /dev/null
@@ -1,642 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.ml.util.DefaultFeatureTransformer;
-import org.apache.hama.ml.util.FeatureTransformer;
-import org.apache.horn.bsp.AbstractLayeredNeuralNetwork.LearningStyle;
-import org.apache.horn.bsp.AbstractLayeredNeuralNetwork.TrainingMethod;
-import org.apache.horn.funcs.FunctionFactory;
-import org.junit.Test;
-import org.mortbay.log.Log;
-
-/**
- * Test the functionality of SmallLayeredNeuralNetwork.
- * 
- */
-public class TestSmallLayeredNeuralNetwork extends MLTestBase {
-
-  @Test
-  public void testReadWrite() {
-    SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-    ann.addLayer(2, false,
-        FunctionFactory.createDoubleFunction("IdentityFunction"));
-    ann.addLayer(5, false,
-        FunctionFactory.createDoubleFunction("IdentityFunction"));
-    ann.addLayer(1, true,
-        FunctionFactory.createDoubleFunction("IdentityFunction"));
-    ann.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction("SquaredError"));
-    double learningRate = 0.2;
-    // ann.setLearningRate(learningRate);
-    double momentumWeight = 0.5;
-    // ann.setMomemtumWeight(momentumWeight);
-    double regularizationWeight = 0.05;
-    //ann.setRegularizationWeight(regularizationWeight);
-    // intentionally initialize all weights to 0.5
-    DoubleMatrix[] matrices = new DenseDoubleMatrix[2];
-    matrices[0] = new DenseDoubleMatrix(5, 3, 0.2);
-    matrices[1] = new DenseDoubleMatrix(1, 6, 0.8);
-    ann.setWeightMatrices(matrices);
-    ann.setLearningStyle(LearningStyle.UNSUPERVISED);
-    
-    FeatureTransformer defaultFeatureTransformer = new DefaultFeatureTransformer();
-    ann.setFeatureTransformer(defaultFeatureTransformer);
-    
-
-    // write to file
-    String modelPath = "/tmp/testSmallLayeredNeuralNetworkReadWrite";
-    ann.setModelPath(modelPath);
-    try {
-      ann.writeModelToFile();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    // read from file
-    SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(new HamaConfiguration(), modelPath);
-    assertEquals(annCopy.getClass().getSimpleName(), annCopy.getModelType());
-    assertEquals(modelPath, annCopy.getModelPath());
-    // assertEquals(learningRate, annCopy.getLearningRate(), 0.000001);
-    // assertEquals(momentumWeight, annCopy.getMomemtumWeight(), 0.000001);
-    //assertEquals(regularizationWeight, annCopy.getRegularizationWeight(),
-    //    0.000001);
-    assertEquals(TrainingMethod.GRADIENT_DESCENT, annCopy.getTrainingMethod());
-    assertEquals(LearningStyle.UNSUPERVISED, annCopy.getLearningStyle());
-
-    // compare weights
-    DoubleMatrix[] weightsMatrices = annCopy.getWeightMatrices();
-    for (int i = 0; i < weightsMatrices.length; ++i) {
-      DoubleMatrix expectMat = matrices[i];
-      DoubleMatrix actualMat = weightsMatrices[i];
-      for (int j = 0; j < expectMat.getRowCount(); ++j) {
-        for (int k = 0; k < expectMat.getColumnCount(); ++k) {
-          assertEquals(expectMat.get(j, k), actualMat.get(j, k), 0.000001);
-        }
-      }
-    }
-    
-    FeatureTransformer copyTransformer = annCopy.getFeatureTransformer();
-    assertEquals(defaultFeatureTransformer.getClass().getName(), copyTransformer.getClass().getName());
-  }
-
-  @Test
-  /**
-   * Test the forward functionality.
-   */
-  public void testOutput() {
-    // first network
-    SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-    ann.addLayer(2, false,
-        FunctionFactory.createDoubleFunction("IdentityFunction"));
-    ann.addLayer(5, false,
-        FunctionFactory.createDoubleFunction("IdentityFunction"));
-    ann.addLayer(1, true,
-        FunctionFactory.createDoubleFunction("IdentityFunction"));
-    ann.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction("SquaredError"));
-    // ann.setLearningRate(0.1);
-    // intentionally initialize all weights to 0.5
-    DoubleMatrix[] matrices = new DenseDoubleMatrix[2];
-    matrices[0] = new DenseDoubleMatrix(5, 3, 0.5);
-    matrices[1] = new DenseDoubleMatrix(1, 6, 0.5);
-    ann.setWeightMatrices(matrices);
-
-    double[] arr = new double[] { 0, 1 };
-    DoubleVector training = new DenseDoubleVector(arr);
-    DoubleVector result = ann.getOutput(training);
-    assertEquals(1, result.getDimension());
-    // assertEquals(3, result.get(0), 0.000001);
-
-    // second network
-    SmallLayeredNeuralNetwork ann2 = new SmallLayeredNeuralNetwork();
-    ann2.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann2.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann2.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann2.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction("SquaredError"));
-    // ann2.setLearningRate(0.3);
-    // intentionally initialize all weights to 0.5
-    DoubleMatrix[] matrices2 = new DenseDoubleMatrix[2];
-    matrices2[0] = new DenseDoubleMatrix(3, 3, 0.5);
-    matrices2[1] = new DenseDoubleMatrix(1, 4, 0.5);
-    ann2.setWeightMatrices(matrices2);
-
-    double[] test = { 0, 0 };
-    double[] result2 = { 0.807476 };
-
-    DoubleVector vec = ann2.getOutput(new DenseDoubleVector(test));
-    assertArrayEquals(result2, vec.toArray(), 0.000001);
-
-    SmallLayeredNeuralNetwork ann3 = new SmallLayeredNeuralNetwork();
-    ann3.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann3.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann3.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann3.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction("SquaredError"));
-    // ann3.setLearningRate(0.3);
-    // intentionally initialize all weights to 0.5
-    DoubleMatrix[] initMatrices = new DenseDoubleMatrix[2];
-    initMatrices[0] = new DenseDoubleMatrix(3, 3, 0.5);
-    initMatrices[1] = new DenseDoubleMatrix(1, 4, 0.5);
-    ann3.setWeightMatrices(initMatrices);
-
-    double[] instance = { 0, 1 };
-    DoubleVector output = ann3.getOutput(new DenseDoubleVector(instance));
-    assertEquals(0.8315410, output.get(0), 0.000001);
-  }
-
-  @Test
-  public void testXORlocal() {
-    SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-    ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction("SquaredError"));
-    // ann.setLearningRate(0.5);
-    // ann.setMomemtumWeight(0.0);
-
-    int iterations = 50000; // iteration should be set to a very large number
-    double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
-    for (int i = 0; i < iterations; ++i) {
-      DoubleMatrix[] matrices = null;
-      for (int j = 0; j < instances.length; ++j) {
-        matrices = ann.trainByInstance(new DenseDoubleVector(instances[j
-            % instances.length]));
-        ann.updateWeightMatrices(matrices);
-      }
-    }
-
-    for (int i = 0; i < instances.length; ++i) {
-      DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
-      // the expected output is the last element in array
-      double result = instances[i][2];
-      double actual = ann.getOutput(input).get(0);
-      if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
-        Log.info("Neural network failes to lear the XOR.");
-      }
-    }
-
-    // write model into file and read out
-    String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocal";
-    ann.setModelPath(modelPath);
-    try {
-      ann.writeModelToFile();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-    SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(new HamaConfiguration(), modelPath);
-    // test on instances
-    for (int i = 0; i < instances.length; ++i) {
-      DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
-      // the expected output is the last element in array
-      double result = instances[i][2];
-      double actual = annCopy.getOutput(input).get(0);
-      if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
-        Log.info("Neural network failes to lear the XOR.");
-      }
-    }
-  }
-
-  @Test
-  public void testXORWithMomentum() {
-    SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-    ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction("SquaredError"));
-    // ann.setLearningRate(0.6);
-    // ann.setMomemtumWeight(0.3);
-
-    int iterations = 2000; // iteration should be set to a very large number
-    double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
-    for (int i = 0; i < iterations; ++i) {
-      for (int j = 0; j < instances.length; ++j) {
-        ann.trainOnline(new DenseDoubleVector(instances[j % instances.length]));
-      }
-    }
-
-    for (int i = 0; i < instances.length; ++i) {
-      DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
-      // the expected output is the last element in array
-      double result = instances[i][2];
-      double actual = ann.getOutput(input).get(0);
-      if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
-        Log.info("Neural network failes to lear the XOR.");
-      }
-    }
-
-    // write model into file and read out
-    String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocalWithMomentum";
-    ann.setModelPath(modelPath);
-    try {
-      ann.writeModelToFile();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-    SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(new HamaConfiguration(), modelPath);
-    // test on instances
-    for (int i = 0; i < instances.length; ++i) {
-      DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
-      // the expected output is the last element in array
-      double result = instances[i][2];
-      double actual = annCopy.getOutput(input).get(0);
-      if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
-        Log.info("Neural network failes to lear the XOR.");
-      }
-    }
-  }
-
-  @Test
-  public void testXORLocalWithRegularization() {
-    SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-    ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction("SquaredError"));
-    // ann.setLearningRate(0.7);
-    // ann.setMomemtumWeight(0.5);
-    //ann.setRegularizationWeight(0.002);
-
-    int iterations = 5000; // iteration should be set to a very large number
-    double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
-    for (int i = 0; i < iterations; ++i) {
-      for (int j = 0; j < instances.length; ++j) {
-        ann.trainOnline(new DenseDoubleVector(instances[j % instances.length]));
-      }
-    }
-
-    for (int i = 0; i < instances.length; ++i) {
-      DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
-      // the expected output is the last element in array
-      double result = instances[i][2];
-      double actual = ann.getOutput(input).get(0);
-      if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
-        Log.info("Neural network failes to lear the XOR.");
-      }
-    }
-
-    // write model into file and read out
-    String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocalWithRegularization";
-    ann.setModelPath(modelPath);
-    try {
-      ann.writeModelToFile();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-    SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(new HamaConfiguration(), modelPath);
-    // test on instances
-    for (int i = 0; i < instances.length; ++i) {
-      DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
-      // the expected output is the last element in array
-      double result = instances[i][2];
-      double actual = annCopy.getOutput(input).get(0);
-      if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
-        Log.info("Neural network failes to lear the XOR.");
-      }
-    }
-  }
-
-  @Test
-  public void testTwoClassClassification() {
-    // use logistic regression data
-    String filepath = "src/test/resources/logistic_regression_data.txt";
-    List<double[]> instanceList = new ArrayList<double[]>();
-
-    try {
-      BufferedReader br = new BufferedReader(new FileReader(filepath));
-      String line = null;
-      while ((line = br.readLine()) != null) {
-        String[] tokens = line.trim().split(",");
-        double[] instance = new double[tokens.length];
-        for (int i = 0; i < tokens.length; ++i) {
-          instance[i] = Double.parseDouble(tokens[i]);
-        }
-        instanceList.add(instance);
-      }
-      br.close();
-    } catch (FileNotFoundException e) {
-      e.printStackTrace();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    zeroOneNormalization(instanceList, instanceList.get(0).length - 1);
-    
-    int dimension = instanceList.get(0).length - 1;
-
-    // divide dataset into training and testing
-    List<double[]> testInstances = new ArrayList<double[]>();
-    testInstances.addAll(instanceList.subList(instanceList.size() - 100,
-        instanceList.size()));
-    List<double[]> trainingInstances = instanceList.subList(0,
-        instanceList.size() - 100);
-
-    SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-    // ann.setLearningRate(0.001);
-    // ann.setMomemtumWeight(0.1);
-    //ann.setRegularizationWeight(0.01);
-    ann.addLayer(dimension, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(dimension, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(dimension, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction("CrossEntropy"));
-
-    long start = new Date().getTime();
-    int iterations = 1000;
-    for (int i = 0; i < iterations; ++i) {
-      for (double[] trainingInstance : trainingInstances) {
-        ann.trainOnline(new DenseDoubleVector(trainingInstance));
-      }
-    }
-    long end = new Date().getTime();
-    Log.info(String.format("Training time: %fs\n",
-        (double) (end - start) / 1000));
-
-    double errorRate = 0;
-    // calculate the error on test instance
-    for (double[] testInstance : testInstances) {
-      DoubleVector instance = new DenseDoubleVector(testInstance);
-      double expected = instance.get(instance.getDimension() - 1);
-      instance = instance.slice(instance.getDimension() - 1);
-      double actual = ann.getOutput(instance).get(0);
-      if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) {
-        ++errorRate;
-      }
-    }
-    errorRate /= testInstances.size();
-
-    Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
-  }
-  
-  @Test
-  public void testLogisticRegression() {
-    this.testLogisticRegressionDistributedVersion();
-    this.testLogisticRegressionDistributedVersionWithFeatureTransformer();
-  }
-
-  public void testLogisticRegressionDistributedVersion() {
-    // write data into a sequence file
-    String tmpStrDatasetPath = "/tmp/logistic_regression_data";
-    Path tmpDatasetPath = new Path(tmpStrDatasetPath);
-    String strDataPath = "src/test/resources/logistic_regression_data.txt";
-    String modelPath = "/tmp/logistic-regression-distributed-model";
-
-    Configuration conf = new Configuration();
-    List<double[]> instanceList = new ArrayList<double[]>();
-    List<double[]> trainingInstances = null;
-    List<double[]> testInstances = null;
-
-    try {
-      FileSystem fs = FileSystem.get(new URI(tmpStrDatasetPath), conf);
-      fs.delete(tmpDatasetPath, true);
-      if (fs.exists(tmpDatasetPath)) {
-        fs.createNewFile(tmpDatasetPath);
-      }
-
-      BufferedReader br = new BufferedReader(new FileReader(strDataPath));
-      String line = null;
-      int count = 0;
-      while ((line = br.readLine()) != null) {
-        String[] tokens = line.trim().split(",");
-        double[] instance = new double[tokens.length];
-        for (int i = 0; i < tokens.length; ++i) {
-          instance[i] = Double.parseDouble(tokens[i]);
-        }
-        instanceList.add(instance);
-      }
-      br.close();
-      
-      zeroOneNormalization(instanceList, instanceList.get(0).length - 1);
-      
-      // write training data to temporal sequence file
-      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
-          tmpDatasetPath, LongWritable.class, VectorWritable.class);
-      int testSize = 150;
-
-      Collections.shuffle(instanceList);
-      testInstances = new ArrayList<double[]>();
-      testInstances.addAll(instanceList.subList(instanceList.size() - testSize,
-          instanceList.size()));
-      trainingInstances = instanceList.subList(0, instanceList.size()
-          - testSize);
-
-      for (double[] instance : trainingInstances) {
-        DoubleVector vec = new DenseDoubleVector(instance);
-        writer.append(new LongWritable(count++), new VectorWritable(vec));
-      }
-      writer.close();
-    } catch (FileNotFoundException e) {
-      e.printStackTrace();
-    } catch (IOException e) {
-      e.printStackTrace();
-    } catch (URISyntaxException e) {
-      e.printStackTrace();
-    }
-
-    // create model
-    int dimension = 8;
-    SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-    // ann.setLearningRate(0.7);
-    // ann.setMomemtumWeight(0.5);
-    //ann.setRegularizationWeight(0.1);
-    ann.addLayer(dimension, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(dimension, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(dimension, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction("CrossEntropy"));
-    ann.setModelPath(modelPath);
-
-    long start = new Date().getTime();
-    Map<String, String> trainingParameters = new HashMap<String, String>();
-    trainingParameters.put("tasks", "5");
-    trainingParameters.put("training.max.iterations", "2000");
-    trainingParameters.put("training.batch.size", "300");
-    trainingParameters.put("convergence.check.interval", "1000");
-    //ann.train(new HamaConfiguration(), tmpDatasetPath, trainingParameters);
-
-    long end = new Date().getTime();
-
-    // validate results
-    double errorRate = 0;
-    // calculate the error on test instance
-    for (double[] testInstance : testInstances) {
-      DoubleVector instance = new DenseDoubleVector(testInstance);
-      double expected = instance.get(instance.getDimension() - 1);
-      instance = instance.slice(instance.getDimension() - 1);
-      double actual = ann.getOutput(instance).get(0);
-      if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) {
-        ++errorRate;
-      }
-    }
-    errorRate /= testInstances.size();
-
-    Log.info(String.format("Training time: %fs\n",
-        (double) (end - start) / 1000));
-    Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
-  }
-
-  public void testLogisticRegressionDistributedVersionWithFeatureTransformer() {
-    // write data into a sequence file
-    String tmpStrDatasetPath = "/tmp/logistic_regression_data_feature_transformer";
-    Path tmpDatasetPath = new Path(tmpStrDatasetPath);
-    String strDataPath = "src/test/resources/logistic_regression_data.txt";
-    String modelPath = "/tmp/logistic-regression-distributed-model-feature-transformer";
-
-    Configuration conf = new Configuration();
-    List<double[]> instanceList = new ArrayList<double[]>();
-    List<double[]> trainingInstances = null;
-    List<double[]> testInstances = null;
-
-    try {
-      FileSystem fs = FileSystem.get(new URI(tmpStrDatasetPath), conf);
-      fs.delete(tmpDatasetPath, true);
-      if (fs.exists(tmpDatasetPath)) {
-        fs.createNewFile(tmpDatasetPath);
-      }
-
-      BufferedReader br = new BufferedReader(new FileReader(strDataPath));
-      String line = null;
-      int count = 0;
-      while ((line = br.readLine()) != null) {
-        String[] tokens = line.trim().split(",");
-        double[] instance = new double[tokens.length];
-        for (int i = 0; i < tokens.length; ++i) {
-          instance[i] = Double.parseDouble(tokens[i]);
-        }
-        instanceList.add(instance);
-      }
-      br.close();
-      
-      zeroOneNormalization(instanceList, instanceList.get(0).length - 1);
-      
-      // write training data to temporal sequence file
-      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
-          tmpDatasetPath, LongWritable.class, VectorWritable.class);
-      int testSize = 150;
-
-      Collections.shuffle(instanceList);
-      testInstances = new ArrayList<double[]>();
-      testInstances.addAll(instanceList.subList(instanceList.size() - testSize,
-          instanceList.size()));
-      trainingInstances = instanceList.subList(0, instanceList.size()
-          - testSize);
-
-      for (double[] instance : trainingInstances) {
-        DoubleVector vec = new DenseDoubleVector(instance);
-        writer.append(new LongWritable(count++), new VectorWritable(vec));
-      }
-      writer.close();
-    } catch (FileNotFoundException e) {
-      e.printStackTrace();
-    } catch (IOException e) {
-      e.printStackTrace();
-    } catch (URISyntaxException e) {
-      e.printStackTrace();
-    }
-
-    // create model
-    int dimension = 8;
-    SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
-    // ann.setLearningRate(0.7);
-    // ann.setMomemtumWeight(0.5);
-    //ann.setRegularizationWeight(0.1);
-    ann.addLayer(dimension, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(dimension, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(dimension, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
-    ann.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction("CrossEntropy"));
-    ann.setModelPath(modelPath);
-    
-    FeatureTransformer featureTransformer = new DefaultFeatureTransformer();
-    
-    ann.setFeatureTransformer(featureTransformer);
-
-    long start = new Date().getTime();
-    Map<String, String> trainingParameters = new HashMap<String, String>();
-    trainingParameters.put("tasks", "5");
-    trainingParameters.put("training.max.iterations", "2000");
-    trainingParameters.put("training.batch.size", "300");
-    trainingParameters.put("convergence.check.interval", "1000");
-    //ann.train(new HamaConfiguration(), tmpDatasetPath, trainingParameters);
-
-    long end = new Date().getTime();
-
-    // validate results
-    double errorRate = 0;
-    // calculate the error on test instance
-    for (double[] testInstance : testInstances) {
-      DoubleVector instance = new DenseDoubleVector(testInstance);
-      double expected = instance.get(instance.getDimension() - 1);
-      instance = instance.slice(instance.getDimension() - 1);
-      instance = featureTransformer.transform(instance);
-      double actual = ann.getOutput(instance).get(0);
-      if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) {
-        ++errorRate;
-      }
-    }
-    errorRate /= testInstances.size();
-
-    Log.info(String.format("Training time: %fs\n",
-        (double) (end - start) / 1000));
-    Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetworkMessage.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetworkMessage.java b/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetworkMessage.java
deleted file mode 100644
index e422d95..0000000
--- a/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetworkMessage.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.junit.Test;
-
-/**
- * Test the functionalities of SmallLayeredNeuralNetworkMessage.
- * 
- */
-public class TestSmallLayeredNeuralNetworkMessage {
-
-  @Test
-  public void testReadWriteWithoutPrev() {
-    double error = 0.22;
-    double[][] matrix1 = new double[][] { { 0.1, 0.2, 0.8, 0.5 },
-        { 0.3, 0.4, 0.6, 0.2 }, { 0.5, 0.6, 0.1, 0.5 } };
-    double[][] matrix2 = new double[][] { { 0.8, 1.2, 0.5 } };
-    DoubleMatrix[] matrices = new DoubleMatrix[2];
-    matrices[0] = new DenseDoubleMatrix(matrix1);
-    matrices[1] = new DenseDoubleMatrix(matrix2);
-
-    boolean isConverge = false;
-
-    SmallLayeredNeuralNetworkMessage message = new SmallLayeredNeuralNetworkMessage(
-        error, isConverge, matrices, null);
-    Configuration conf = new Configuration();
-    String strPath = "/tmp/testReadWriteSmallLayeredNeuralNetworkMessage";
-    Path path = new Path(strPath);
-    try {
-      FileSystem fs = FileSystem.get(new URI(strPath), conf);
-      FSDataOutputStream out = fs.create(path);
-      message.write(out);
-      out.close();
-
-      FSDataInputStream in = fs.open(path);
-      SmallLayeredNeuralNetworkMessage readMessage = new SmallLayeredNeuralNetworkMessage(
-          0, isConverge, null, null);
-      readMessage.readFields(in);
-      in.close();
-      assertEquals(error, readMessage.getTrainingError(), 0.000001);
-      assertFalse(readMessage.isConverge());
-      DoubleMatrix[] readMatrices = readMessage.getCurMatrices();
-      assertEquals(2, readMatrices.length);
-      for (int i = 0; i < readMatrices.length; ++i) {
-        double[][] doubleMatrices = ((DenseDoubleMatrix) readMatrices[i])
-            .getValues();
-        double[][] doubleExpected = ((DenseDoubleMatrix) matrices[i])
-            .getValues();
-        for (int r = 0; r < doubleMatrices.length; ++r) {
-          assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001);
-        }
-      }
-
-      DoubleMatrix[] readPrevMatrices = readMessage.getPrevMatrices();
-      assertNull(readPrevMatrices);
-
-      // delete
-      fs.delete(path, true);
-    } catch (IOException e) {
-      e.printStackTrace();
-    } catch (URISyntaxException e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Test
-  public void testReadWriteWithPrev() {
-    double error = 0.22;
-    boolean isConverge = true;
-
-    double[][] matrix1 = new double[][] { { 0.1, 0.2, 0.8, 0.5 },
-        { 0.3, 0.4, 0.6, 0.2 }, { 0.5, 0.6, 0.1, 0.5 } };
-    double[][] matrix2 = new double[][] { { 0.8, 1.2, 0.5 } };
-    DoubleMatrix[] matrices = new DoubleMatrix[2];
-    matrices[0] = new DenseDoubleMatrix(matrix1);
-    matrices[1] = new DenseDoubleMatrix(matrix2);
-
-    double[][] prevMatrix1 = new double[][] { { 0.1, 0.1, 0.2, 0.3 },
-        { 0.2, 0.4, 0.1, 0.5 }, { 0.5, 0.1, 0.5, 0.2 } };
-    double[][] prevMatrix2 = new double[][] { { 0.1, 0.2, 0.5, 0.9 },
-        { 0.3, 0.5, 0.2, 0.6 }, { 0.6, 0.8, 0.7, 0.5 } };
-
-    DoubleMatrix[] prevMatrices = new DoubleMatrix[2];
-    prevMatrices[0] = new DenseDoubleMatrix(prevMatrix1);
-    prevMatrices[1] = new DenseDoubleMatrix(prevMatrix2);
-
-    SmallLayeredNeuralNetworkMessage message = new SmallLayeredNeuralNetworkMessage(
-        error, isConverge, matrices, prevMatrices);
-    Configuration conf = new Configuration();
-    String strPath = "/tmp/testReadWriteSmallLayeredNeuralNetworkMessageWithPrev";
-    Path path = new Path(strPath);
-    try {
-      FileSystem fs = FileSystem.get(new URI(strPath), conf);
-      FSDataOutputStream out = fs.create(path);
-      message.write(out);
-      out.close();
-
-      FSDataInputStream in = fs.open(path);
-      SmallLayeredNeuralNetworkMessage readMessage = new SmallLayeredNeuralNetworkMessage(
-          0, isConverge, null, null);
-      readMessage.readFields(in);
-      in.close();
-
-      assertTrue(readMessage.isConverge());
-
-      DoubleMatrix[] readMatrices = readMessage.getCurMatrices();
-      assertEquals(2, readMatrices.length);
-      for (int i = 0; i < readMatrices.length; ++i) {
-        double[][] doubleMatrices = ((DenseDoubleMatrix) readMatrices[i])
-            .getValues();
-        double[][] doubleExpected = ((DenseDoubleMatrix) matrices[i])
-            .getValues();
-        for (int r = 0; r < doubleMatrices.length; ++r) {
-          assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001);
-        }
-      }
-
-      DoubleMatrix[] readPrevMatrices = readMessage.getPrevMatrices();
-      assertEquals(2, readPrevMatrices.length);
-      for (int i = 0; i < readPrevMatrices.length; ++i) {
-        double[][] doubleMatrices = ((DenseDoubleMatrix) readPrevMatrices[i])
-            .getValues();
-        double[][] doubleExpected = ((DenseDoubleMatrix) prevMatrices[i])
-            .getValues();
-        for (int r = 0; r < doubleMatrices.length; ++r) {
-          assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001);
-        }
-      }
-
-      // delete
-      fs.delete(path, true);
-    } catch (IOException e) {
-      e.printStackTrace();
-    } catch (URISyntaxException e) {
-      e.printStackTrace();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/core/MLTestBase.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/core/MLTestBase.java b/src/test/java/org/apache/horn/core/MLTestBase.java
new file mode 100644
index 0000000..3f02600
--- /dev/null
+++ b/src/test/java/org/apache/horn/core/MLTestBase.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The common methods for testing machine learning algorithms
+ *
+ */
+public abstract class MLTestBase {
+
+  /**
+   * Conduct the 0-1 normalization.
+   * 
+   * @param instances
+   */
+  protected static void zeroOneNormalization(List<double[]> instanceList,
+      int len) {
+    int dimension = len;
+
+    double[] mins = new double[dimension];
+    double[] maxs = new double[dimension];
+    Arrays.fill(mins, Double.MAX_VALUE);
+    Arrays.fill(maxs, Double.MIN_VALUE);
+
+    for (double[] instance : instanceList) {
+      for (int i = 0; i < len; ++i) {
+        if (mins[i] > instance[i]) {
+          mins[i] = instance[i];
+        }
+        if (maxs[i] < instance[i]) {
+          maxs[i] = instance[i];
+        }
+      }
+    }
+
+    for (double[] instance : instanceList) {
+      for (int i = 0; i < len; ++i) {
+        double range = maxs[i] - mins[i];
+        if (range != 0) {
+          instance[i] = (instance[i] - mins[i]) / range;
+        }
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/core/TestAutoEncoder.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/core/TestAutoEncoder.java b/src/test/java/org/apache/horn/core/TestAutoEncoder.java
new file mode 100644
index 0000000..10ae738
--- /dev/null
+++ b/src/test/java/org/apache/horn/core/TestAutoEncoder.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.horn.core.AutoEncoder;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+/**
+ * Test the functionality of {@link AutoEncoder}.
+ * 
+ */
+public class TestAutoEncoder extends MLTestBase {
+
+  @Test
+  public void testAutoEncoderSimple() {
+    double[][] instances = { { 0, 0, 0, 1 }, { 0, 0, 1, 0 }, { 0, 1, 0, 0 },
+        { 0, 0, 0, 0 } };
+    AutoEncoder encoder = new AutoEncoder(4, 2);
+    // TODO use the configuration
+
+    // encoder.setLearningRate(0.5);
+    // encoder.setMomemtumWeight(0.2);
+
+    int maxIteration = 2000;
+    Random rnd = new Random();
+    for (int iteration = 0; iteration < maxIteration; ++iteration) {
+      for (int i = 0; i < instances.length; ++i) {
+        encoder.trainOnline(new DenseDoubleVector(instances[rnd
+            .nextInt(instances.length)]));
+      }
+    }
+
+    for (int i = 0; i < instances.length; ++i) {
+      DoubleVector encodeVec = encoder.encode(new DenseDoubleVector(
+          instances[i]));
+      DoubleVector decodeVec = encoder.decode(encodeVec);
+      for (int d = 0; d < instances[i].length; ++d) {
+        assertEquals(instances[i][d], decodeVec.get(d), 0.1);
+      }
+    }
+
+  }
+
+  @Test
+  public void testAutoEncoderSwissRollDataset() {
+    List<double[]> instanceList = new ArrayList<double[]>();
+    try {
+      BufferedReader br = new BufferedReader(new FileReader(
+          "src/test/resources/dimensional_reduction.txt"));
+      String line = null;
+      while ((line = br.readLine()) != null) {
+        String[] tokens = line.split("\t");
+        double[] instance = new double[tokens.length];
+        for (int i = 0; i < instance.length; ++i) {
+          instance[i] = Double.parseDouble(tokens[i]);
+        }
+        instanceList.add(instance);
+      }
+      br.close();
+      // normalize instances
+      zeroOneNormalization(instanceList, instanceList.get(0).length);
+    } catch (FileNotFoundException e) {
+      e.printStackTrace();
+    } catch (NumberFormatException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    List<DoubleVector> vecInstanceList = new ArrayList<DoubleVector>();
+    for (double[] instance : instanceList) {
+      vecInstanceList.add(new DenseDoubleVector(instance));
+    }
+    AutoEncoder encoder = new AutoEncoder(3, 2);
+    // encoder.setLearningRate(0.05);
+    // encoder.setMomemtumWeight(0.1);
+    int maxIteration = 2000;
+    for (int iteration = 0; iteration < maxIteration; ++iteration) {
+      for (DoubleVector vector : vecInstanceList) {
+        encoder.trainOnline(vector);
+      }
+    }
+
+    double errorInstance = 0;
+    for (DoubleVector vector : vecInstanceList) {
+      DoubleVector decoded = encoder.getOutput(vector);
+      DoubleVector diff = vector.subtract(decoded);
+      double error = diff.dot(diff);
+      if (error > 0.1) {
+        ++errorInstance;
+      }
+    }
+    Log.info(String.format("Autoecoder error rate: %f%%\n", errorInstance * 100
+        / vecInstanceList.size()));
+
+  }
+
+  @Test
+  public void testAutoEncoderSwissRollDatasetDistributed() {
+    HamaConfiguration conf = new HamaConfiguration();
+    String strDataPath = "/tmp/dimensional_reduction.txt";
+    Path path = new Path(strDataPath);
+    List<double[]> instanceList = new ArrayList<double[]>();
+    try {
+      FileSystem fs = FileSystem.get(new URI(strDataPath), conf);
+      if (fs.exists(path)) {
+        fs.delete(path, true);
+      }
+
+      String line = null;
+      BufferedReader br = new BufferedReader(new FileReader(
+          "src/test/resources/dimensional_reduction.txt"));
+      while ((line = br.readLine()) != null) {
+        String[] tokens = line.split("\t");
+        double[] instance = new double[tokens.length];
+        for (int i = 0; i < instance.length; ++i) {
+          instance[i] = Double.parseDouble(tokens[i]);
+        }
+        instanceList.add(instance);
+      }
+      br.close();
+      // normalize instances
+      zeroOneNormalization(instanceList, instanceList.get(0).length);
+
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
+          LongWritable.class, VectorWritable.class);
+      for (int i = 0; i < instanceList.size(); ++i) {
+        DoubleVector vector = new DenseDoubleVector(instanceList.get(i));
+        writer.append(new LongWritable(i), new VectorWritable(vector));
+      }
+
+      writer.close();
+    } catch (FileNotFoundException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }
+
+    AutoEncoder encoder = new AutoEncoder(3, 2);
+    String modelPath = "/tmp/autoencoder-modelpath";
+    encoder.setModelPath(modelPath);
+    Map<String, String> trainingParams = new HashMap<String, String>();
+    // encoder.setLearningRate(0.5);
+    trainingParams.put("tasks", "5");
+    trainingParams.put("training.max.iterations", "3000");
+    trainingParams.put("training.batch.size", "200");
+    // encoder.train(conf, path, trainingParams);
+
+    double errorInstance = 0;
+    for (double[] instance : instanceList) {
+      DoubleVector vector = new DenseDoubleVector(instance);
+      DoubleVector decoded = encoder.getOutput(vector);
+      DoubleVector diff = vector.subtract(decoded);
+      double error = diff.dot(diff);
+      if (error > 0.1) {
+        ++errorInstance;
+      }
+    }
+    Log.info(String.format("Autoecoder error rate: %f%%\n", errorInstance * 100
+        / instanceList.size()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/core/TestNeuron.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/core/TestNeuron.java b/src/test/java/org/apache/horn/core/TestNeuron.java
new file mode 100644
index 0000000..f2fe4e1
--- /dev/null
+++ b/src/test/java/org/apache/horn/core/TestNeuron.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.horn.core.Neuron;
+import org.apache.horn.core.Synapse;
+import org.apache.horn.funcs.Sigmoid;
+
+public class TestNeuron extends TestCase {
+  private static double learningRate = 0.1;
+  private static double bias = -1;
+  private static double theta = 0.8;
+
+  public static class MyNeuron extends
+      Neuron<Synapse<DoubleWritable, DoubleWritable>> {
+
+    @Override
+    public void setup(HamaConfiguration conf) {
+    }
+
+    @Override
+    public void forward(
+        Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
+        throws IOException {
+      double sum = 0;
+      for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
+        sum += m.getInput() * m.getWeight();
+      }
+      sum += (bias * theta);
+      this.feedforward(new Sigmoid().apply(sum));
+    }
+
+    @Override
+    public void backward(
+        Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
+        throws IOException {
+      for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
+        // Calculates error gradient for each neuron
+        double gradient = new Sigmoid().applyDerivative(this.getOutput()) * (m.getDelta() * m.getWeight());
+
+        // Propagates to lower layer
+        backpropagate(gradient);
+
+        // Weight corrections
+        double weight = learningRate * this.getOutput() * m.getDelta();
+        this.push(weight);
+      }
+    }
+
+  }
+
+  public void testProp() throws IOException {
+    List<Synapse<DoubleWritable, DoubleWritable>> x = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
+    x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
+        1.0), new DoubleWritable(0.5)));
+    x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
+        1.0), new DoubleWritable(0.4)));
+
+    MyNeuron n = new MyNeuron();
+    n.forward(x);
+    assertEquals(0.5249791874789399, n.getOutput());
+
+    x.clear();
+    x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
+        -0.1274), new DoubleWritable(-1.2)));
+    n.backward(x);
+    assertEquals(-0.006688234848481696, n.getUpdate());
+  }
+
+}