You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@horn.apache.org by ed...@apache.org on 2016/04/26 05:46:25 UTC

[4/4] incubator-horn git commit: Code refactoring

Code refactoring


Project: http://git-wip-us.apache.org/repos/asf/incubator-horn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-horn/commit/ac8eaf8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-horn/tree/ac8eaf8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-horn/diff/ac8eaf8e

Branch: refs/heads/master
Commit: ac8eaf8e179b147777e7f1b9312045a42df86373
Parents: 277773c
Author: Edward J. Yoon <ed...@apache.org>
Authored: Tue Apr 26 12:30:53 2016 +0900
Committer: Edward J. Yoon <ed...@apache.org>
Committed: Tue Apr 26 12:42:47 2016 +0900

----------------------------------------------------------------------
 .../horn/bsp/AbstractLayeredNeuralNetwork.java  | 221 -------
 .../java/org/apache/horn/bsp/AutoEncoder.java   | 197 ------
 src/main/java/org/apache/horn/bsp/HornJob.java  | 109 ----
 .../java/org/apache/horn/bsp/NeuralNetwork.java | 237 -------
 .../apache/horn/bsp/NeuralNetworkTrainer.java   | 106 ---
 src/main/java/org/apache/horn/bsp/Neuron.java   |  82 ---
 .../org/apache/horn/bsp/NeuronInterface.java    |  48 --
 .../org/apache/horn/bsp/ParameterMerger.java    |  27 -
 .../apache/horn/bsp/ParameterMergerServer.java  | 132 ----
 .../horn/bsp/SmallLayeredNeuralNetwork.java     | 620 ------------------
 .../bsp/SmallLayeredNeuralNetworkMessage.java   | 126 ----
 .../bsp/SmallLayeredNeuralNetworkTrainer.java   | 216 -------
 src/main/java/org/apache/horn/bsp/Synapse.java  |  85 ---
 .../horn/core/AbstractLayeredNeuralNetwork.java | 221 +++++++
 .../apache/horn/core/AbstractNeuralNetwork.java | 237 +++++++
 .../horn/core/AbstractNeuralNetworkTrainer.java | 108 ++++
 .../java/org/apache/horn/core/AutoEncoder.java  | 197 ++++++
 src/main/java/org/apache/horn/core/HornJob.java | 109 ++++
 .../apache/horn/core/LayeredNeuralNetwork.java  | 621 ++++++++++++++++++
 .../horn/core/LayeredNeuralNetworkTrainer.java  | 216 +++++++
 src/main/java/org/apache/horn/core/Neuron.java  |  82 +++
 .../org/apache/horn/core/NeuronInterface.java   |  48 ++
 .../org/apache/horn/core/ParameterMerger.java   |  27 +
 .../apache/horn/core/ParameterMergerServer.java | 132 ++++
 .../org/apache/horn/core/ParameterMessage.java  | 125 ++++
 src/main/java/org/apache/horn/core/Synapse.java |  85 +++
 .../horn/examples/MultiLayerPerceptron.java     |   8 +-
 .../org/apache/horn/examples/NeuralNetwork.java | 217 -------
 .../java/org/apache/horn/bsp/MLTestBase.java    |  64 --
 .../org/apache/horn/bsp/TestAutoEncoder.java    | 202 ------
 .../horn/bsp/TestSmallLayeredNeuralNetwork.java | 642 -------------------
 .../TestSmallLayeredNeuralNetworkMessage.java   | 172 -----
 .../java/org/apache/horn/core/MLTestBase.java   |  64 ++
 .../org/apache/horn/core/TestAutoEncoder.java   | 203 ++++++
 .../java/org/apache/horn/core/TestNeuron.java   |  93 +++
 .../core/TestSmallLayeredNeuralNetwork.java     | 642 +++++++++++++++++++
 .../TestSmallLayeredNeuralNetworkMessage.java   | 173 +++++
 .../horn/examples/MultiLayerPerceptronTest.java | 177 +++++
 .../apache/horn/examples/NeuralNetworkTest.java | 212 ------
 .../org/apache/horn/trainer/TestNeuron.java     |  93 ---
 40 files changed, 3564 insertions(+), 3812 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
deleted file mode 100644
index b18eb44..0000000
--- a/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.commons.math.DoubleDoubleFunction;
-import org.apache.hama.commons.math.DoubleFunction;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.horn.funcs.FunctionFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * AbstractLayeredNeuralNetwork defines the general operations for derivative
- * layered models, include Linear Regression, Logistic Regression, Multilayer
- * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc.
- * 
- * In general, these models consist of neurons which are aligned in layers.
- * Between layers, for any two adjacent layers, the neurons are connected to
- * form a bipartite weighted graph.
- * 
- */
-abstract class AbstractLayeredNeuralNetwork extends NeuralNetwork {
-
-  private static final double DEFAULT_MOMENTUM_WEIGHT = 0.1;
-
-  double trainingError;
-
-  /* The momentumWeight */
-  protected double momentumWeight;
-
-  /* The cost function of the model */
-  protected DoubleDoubleFunction costFunction;
-
-  /* Record the size of each layer */
-  protected List<Integer> layerSizeList;
-
-  protected TrainingMethod trainingMethod;
-  
-  protected LearningStyle learningStyle;
-
-  public static enum TrainingMethod {
-    GRADIENT_DESCENT
-  }
-  
-  public static enum LearningStyle {
-    UNSUPERVISED,
-    SUPERVISED
-  }
-  
-  public AbstractLayeredNeuralNetwork() {
-    this.momentumWeight = DEFAULT_MOMENTUM_WEIGHT;
-    this.trainingMethod = TrainingMethod.GRADIENT_DESCENT;
-    this.learningStyle = LearningStyle.SUPERVISED;
-  }
-
-  public AbstractLayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
-    super(conf, modelPath);
-  }
-
-  public void setTrainingMethod(TrainingMethod method) {
-    this.trainingMethod = method;
-  }
-
-  public TrainingMethod getTrainingMethod() {
-    return this.trainingMethod;
-  }
-  
-  public void setLearningStyle(LearningStyle style) {
-    this.learningStyle = style;
-  }
-  
-  public LearningStyle getLearningStyle() {
-    return this.learningStyle;
-  }
-
-  /**
-   * Set the cost function for the model.
-   * 
-   * @param costFunction
-   */
-  public void setCostFunction(DoubleDoubleFunction costFunction) {
-    this.costFunction = costFunction;
-  }
-
-  /**
-   * Add a layer of neurons with specified size. If the added layer is not the
-   * first layer, it will automatically connects the neurons between with the
-   * previous layer.
-   * 
-   * @param size
-   * @param isFinalLayer If false, add a bias neuron.
-   * @param squashingFunction The squashing function for this layer, input layer
-   *          is f(x) = x by default.
-   * @return The layer index, starts with 0.
-   */
-  public abstract int addLayer(int size, boolean isFinalLayer,
-      DoubleFunction squashingFunction);
-
-  /**
-   * Get the size of a particular layer.
-   * 
-   * @param layer
-   * @return The layer size.
-   */
-  public int getLayerSize(int layer) {
-    Preconditions.checkArgument(
-        layer >= 0 && layer < this.layerSizeList.size(),
-        String.format("Input must be in range [0, %d]\n",
-            this.layerSizeList.size() - 1));
-    return this.layerSizeList.get(layer);
-  }
-
-  /**
-   * Get the layer size list.
-   * 
-   * @return The layer size list.
-   */
-  protected List<Integer> getLayerSizeList() {
-    return this.layerSizeList;
-  }
-
-  /**
-   * Get the weights between layer layerIdx and layerIdx + 1
-   * 
-   * @param layerIdx The index of the layer
-   * @return The weights in form of {@link DoubleMatrix}
-   */
-  public abstract DoubleMatrix getWeightsByLayer(int layerIdx);
-
-  /**
-   * Get the updated weights using one training instance.
-   * 
-   * @param trainingInstance The trainingInstance is the concatenation of
-   *          feature vector and class label vector.
-   * @return The update of each weight, in form of matrix list.
-   * @throws Exception
-   */
-  public abstract DoubleMatrix[] trainByInstance(DoubleVector trainingInstance);
-
-  /**
-   * Get the output calculated by the model.
-   * 
-   * @param instance The feature instance.
-   * @return a new vector with the result of the operation.
-   */
-  public abstract DoubleVector getOutput(DoubleVector instance);
-
-  /**
-   * Calculate the training error based on the labels and outputs.
-   * 
-   * @param labels
-   * @param output
-   */
-  protected abstract void calculateTrainingError(DoubleVector labels,
-      DoubleVector output);
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    super.readFields(input);
-    // read momentum weight
-    this.momentumWeight = input.readDouble();
-
-    // read cost function
-    this.costFunction = FunctionFactory
-        .createDoubleDoubleFunction(WritableUtils.readString(input));
-
-    // read layer size list
-    int numLayers = input.readInt();
-    this.layerSizeList = Lists.newArrayList();
-    for (int i = 0; i < numLayers; ++i) {
-      this.layerSizeList.add(input.readInt());
-    }
-
-    this.trainingMethod = WritableUtils.readEnum(input, TrainingMethod.class);
-    this.learningStyle = WritableUtils.readEnum(input, LearningStyle.class);
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    super.write(output);
-    // write momentum weight
-    output.writeDouble(this.momentumWeight);
-
-    // write cost function
-    WritableUtils.writeString(output, costFunction.getFunctionName());
-
-    // write layer size list
-    output.writeInt(this.layerSizeList.size());
-    for (Integer aLayerSizeList : this.layerSizeList) {
-      output.writeInt(aLayerSizeList);
-    }
-
-    WritableUtils.writeEnum(output, this.trainingMethod);
-    WritableUtils.writeEnum(output, this.learningStyle);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/AutoEncoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/AutoEncoder.java b/src/main/java/org/apache/horn/bsp/AutoEncoder.java
deleted file mode 100644
index 8ea2930..0000000
--- a/src/main/java/org/apache/horn/bsp/AutoEncoder.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleFunction;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.ml.util.FeatureTransformer;
-import org.apache.horn.funcs.FunctionFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- * AutoEncoder is a model used for dimensional reduction and feature learning.
- * It is a special kind of {@link NeuralNetwork} that consists of three layers
- * of neurons, where the first layer and third layer contains the same number of
- * neurons.
- * 
- */
-public class AutoEncoder {
-
-  private final SmallLayeredNeuralNetwork model;
-
-  /**
-   * Initialize the autoencoder.
-   * 
-   * @param inputDimensions The number of dimensions for the input feature.
-   * @param compressedDimensions The number of dimensions for the compressed
-   *          information.
-   */
-  public AutoEncoder(int inputDimensions, int compressedDimensions) {
-    model = new SmallLayeredNeuralNetwork();
-    model.addLayer(inputDimensions, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    model.addLayer(compressedDimensions, false,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    model.addLayer(inputDimensions, true,
-        FunctionFactory.createDoubleFunction("Sigmoid"));
-    model
-        .setLearningStyle(AbstractLayeredNeuralNetwork.LearningStyle.UNSUPERVISED);
-    model.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction("SquaredError"));
-  }
-
-  public AutoEncoder(HamaConfiguration conf, String modelPath) {
-    model = new SmallLayeredNeuralNetwork(conf, modelPath);
-  }
-
-  public AutoEncoder setModelPath(String modelPath) {
-    model.setModelPath(modelPath);
-    return this;
-  }
-
-  /**
-   * Train the autoencoder with given data. Note that the training data is
-   * pre-processed, where the features
-   * 
-   * @param dataInputPath
-   * @param trainingParams
-   * @throws InterruptedException 
-   * @throws IOException 
-   * @throws ClassNotFoundException 
-   */
-  public BSPJob train(HamaConfiguration conf, Path dataInputPath,
-      Map<String, String> trainingParams) throws ClassNotFoundException, IOException, InterruptedException {
-    return model.train(conf);
-  }
-
-  /**
-   * Train the model with one instance.
-   * 
-   * @param trainingInstance
-   */
-  public void trainOnline(DoubleVector trainingInstance) {
-    model.trainOnline(trainingInstance);
-  }
-
-  /**
-   * Get the matrix M used to encode the input features.
-   * 
-   * @return this matrix with encode the input.
-   */
-  public DoubleMatrix getEncodeWeightMatrix() {
-    return model.getWeightsByLayer(0);
-  }
-
-  /**
-   * Get the matrix M used to decode the compressed information.
-   * 
-   * @return this matrix with decode the compressed information.
-   */
-  public DoubleMatrix getDecodeWeightMatrix() {
-    return model.getWeightsByLayer(1);
-  }
-
-  /**
-   * Transform the input features.
-   * 
-   * @param inputInstance
-   * @return The compressed information.
-   */
-  private DoubleVector transform(DoubleVector inputInstance, int inputLayer) {
-    DoubleVector internalInstance = new DenseDoubleVector(
-        inputInstance.getDimension() + 1);
-    internalInstance.set(0, 1);
-    for (int i = 0; i < inputInstance.getDimension(); ++i) {
-      internalInstance.set(i + 1, inputInstance.get(i));
-    }
-    DoubleFunction squashingFunction = model.getSquashingFunction(inputLayer);
-    DoubleMatrix weightMatrix = null;
-    if (inputLayer == 0) {
-      weightMatrix = this.getEncodeWeightMatrix();
-    } else {
-      weightMatrix = this.getDecodeWeightMatrix();
-    }
-    DoubleVector vec = weightMatrix.multiplyVectorUnsafe(internalInstance);
-    vec = vec.applyToElements(squashingFunction);
-    return vec;
-  }
-
-  /**
-   * Encode the input instance.
-   * 
-   * @param inputInstance
-   * @return a new vector with the encode input instance.
-   */
-  public DoubleVector encode(DoubleVector inputInstance) {
-    Preconditions
-        .checkArgument(
-            inputInstance.getDimension() == model.getLayerSize(0) - 1,
-            String
-                .format(
-                    "The dimension of input instance is %d, but the model requires dimension %d.",
-                    inputInstance.getDimension(), model.getLayerSize(1) - 1));
-    return this.transform(inputInstance, 0);
-  }
-
-  /**
-   * Decode the input instance.
-   * 
-   * @param inputInstance
-   * @return a new vector with the decode input instance.
-   */
-  public DoubleVector decode(DoubleVector inputInstance) {
-    Preconditions
-        .checkArgument(
-            inputInstance.getDimension() == model.getLayerSize(1) - 1,
-            String
-                .format(
-                    "The dimension of input instance is %d, but the model requires dimension %d.",
-                    inputInstance.getDimension(), model.getLayerSize(1) - 1));
-    return this.transform(inputInstance, 1);
-  }
-
-  /**
-   * Get the label(s) according to the given features.
-   * 
-   * @param inputInstance
-   * @return a new vector with output of the model according to given feature
-   *         instance.
-   */
-  public DoubleVector getOutput(DoubleVector inputInstance) {
-    return model.getOutput(inputInstance);
-  }
-
-  /**
-   * Set the feature transformer.
-   * 
-   * @param featureTransformer
-   */
-  public void setFeatureTransformer(FeatureTransformer featureTransformer) {
-    this.model.setFeatureTransformer(featureTransformer);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/HornJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/HornJob.java b/src/main/java/org/apache/horn/bsp/HornJob.java
deleted file mode 100644
index 4521b87..0000000
--- a/src/main/java/org/apache/horn/bsp/HornJob.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.commons.math.Function;
-import org.apache.horn.funcs.FunctionFactory;
-
-public class HornJob extends BSPJob {
-
-  SmallLayeredNeuralNetwork neuralNetwork;
-
-  public HornJob(HamaConfiguration conf, Class<?> exampleClass)
-      throws IOException {
-    super(conf);
-    this.setJarByClass(exampleClass);
-
-    neuralNetwork = new SmallLayeredNeuralNetwork();
-  }
-
-  public void inputLayer(int featureDimension, Class<? extends Function> func) {
-    addLayer(featureDimension, func);
-  }
-  
-  public void addLayer(int featureDimension, Class<? extends Function> func) {
-    neuralNetwork.addLayer(featureDimension, false,
-        FunctionFactory.createDoubleFunction(func.getSimpleName()));
-  }
-
-  public void outputLayer(int labels, Class<? extends Function> func) {
-    neuralNetwork.addLayer(labels, true,
-        FunctionFactory.createDoubleFunction(func.getSimpleName()));
-  }
-
-  public void setCostFunction(Class<? extends Function> func) {
-    neuralNetwork.setCostFunction(FunctionFactory
-        .createDoubleDoubleFunction(func.getSimpleName()));
-  }
-
-  public void setDouble(String name, double value) {
-    conf.setDouble(name, value);
-  }
-
-  public void setMaxIteration(int maxIteration) {
-    this.conf.setInt("training.max.iterations", maxIteration);
-  }
-
-  public void setBatchSize(int batchSize) {
-    this.conf.setInt("training.batch.size", batchSize);
-  }
-
-  public void setLearningRate(double learningRate) {
-    this.conf.setDouble("mlp.learning.rate", learningRate);
-  }
-
-  public void setConvergenceCheckInterval(int n) {
-    this.conf.setInt("convergence.check.interval", n);
-  }
-
-  public void setMomentumWeight(double momentumWeight) {
-    this.conf.setDouble("mlp.momentum.weight", momentumWeight);
-  }
-
-  public SmallLayeredNeuralNetwork getNeuralNetwork() {
-    return neuralNetwork;
-  }
-
-  public boolean waitForCompletion(boolean verbose) throws IOException,
-      InterruptedException, ClassNotFoundException {
-    BSPJob job = neuralNetwork.train(this.conf);
-    if (verbose) {
-      return job.waitForCompletion(true);
-    } else {
-      return job.waitForCompletion(false);
-    }
-  }
-
-  public void setRegularizationWeight(double regularizationWeight) {
-    this.conf.setDouble("regularization.weight", regularizationWeight);
-  }
-
-  public void setModelPath(String modelPath) {
-    this.conf.set("model.path", modelPath);
-    neuralNetwork.setModelPath(modelPath);
-  }
-
-  public void setTrainingSetPath(String inputPath) {
-    this.conf.set("training.input.path", inputPath);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/NeuralNetwork.java b/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
deleted file mode 100644
index 051881d..0000000
--- a/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.ml.util.DefaultFeatureTransformer;
-import org.apache.hama.ml.util.FeatureTransformer;
-
-import com.google.common.base.Preconditions;
-import com.google.common.io.Closeables;
-
-/**
- * NeuralNetwork defines the general operations for all the derivative models.
- * Typically, all derivative models such as Linear Regression, Logistic
- * Regression, and Multilayer Perceptron consist of neurons and the weights
- * between neurons.
- * 
- */
-abstract class NeuralNetwork implements Writable {
-  protected HamaConfiguration conf;
-  protected FileSystem fs;
-
-  private static final double DEFAULT_LEARNING_RATE = 0.5;
-
-  protected double learningRate;
-  protected boolean learningRateDecay = false;
-
-  // the name of the model
-  protected String modelType;
-  // the path to store the model
-  protected String modelPath;
-
-  protected FeatureTransformer featureTransformer;
-
-  public NeuralNetwork() {
-    this.learningRate = DEFAULT_LEARNING_RATE;
-    this.modelType = this.getClass().getSimpleName();
-    this.featureTransformer = new DefaultFeatureTransformer();
-  }
-
-  public NeuralNetwork(String modelPath) {
-    this.modelPath = modelPath;
-  }
-
-  public NeuralNetwork(HamaConfiguration conf, String modelPath) {
-    try {
-      this.conf = conf;
-      this.fs = FileSystem.get(conf);
-      this.modelPath = modelPath;
-
-      this.readFromModel();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-  }
-
-  public void isLearningRateDecay(boolean decay) {
-    this.learningRateDecay = decay;
-  }
-
-  public String getModelType() {
-    return this.modelType;
-  }
-
-  /**
-   * Train the model with the path of given training data and parameters.
-   * 
-   * @param dataInputPath The path of the training data.
-   * @param trainingParams The parameters for training.
-   * @throws InterruptedException 
-   * @throws ClassNotFoundException 
-   * @throws IOException
-   */
-  public BSPJob train(Configuration conf) throws ClassNotFoundException, IOException, InterruptedException {
-    Preconditions.checkArgument(this.modelPath != null,
-        "Please set the model path before training.");
-
-    // train with BSP job
-    return trainInternal((HamaConfiguration) conf);
-  }
-
-  /**
-   * Train the model with the path of given training data and parameters.
-   */
-  protected abstract BSPJob trainInternal(HamaConfiguration hamaConf)
-      throws IOException, InterruptedException, ClassNotFoundException;
-
-  /**
-   * Read the model meta-data from the specified location.
-   * 
-   * @throws IOException
-   */
-  protected void readFromModel() throws IOException {
-    Preconditions.checkArgument(this.modelPath != null,
-        "Model path has not been set.");
-    FSDataInputStream is = new FSDataInputStream(fs.open(new Path(modelPath)));
-    this.readFields(is);
-    Closeables.close(is, false);
-  }
-
-  /**
-   * Write the model data to specified location.
-   * 
-   * @throws IOException
-   */
-  public void writeModelToFile() throws IOException {
-    Preconditions.checkArgument(this.modelPath != null,
-        "Model path has not been set.");
-
-    FSDataOutputStream is = fs.create(new Path(this.modelPath), true);
-    this.write(is);
-
-    Closeables.close(is, false);
-  }
-
-  /**
-   * Set the model path.
-   * 
-   * @param modelPath
-   */
-  public void setModelPath(String modelPath) {
-    this.modelPath = modelPath;
-  }
-
-  /**
-   * Get the model path.
-   * 
-   * @return the path to store the model.
-   */
-  public String getModelPath() {
-    return this.modelPath;
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    // read model type
-    this.modelType = WritableUtils.readString(input);
-    // read learning rate
-    this.learningRate = input.readDouble();
-    // read model path
-    this.modelPath = WritableUtils.readString(input);
-
-    if (this.modelPath.equals("null")) {
-      this.modelPath = null;
-    }
-
-    // read feature transformer
-    int bytesLen = input.readInt();
-    byte[] featureTransformerBytes = new byte[bytesLen];
-    for (int i = 0; i < featureTransformerBytes.length; ++i) {
-      featureTransformerBytes[i] = input.readByte();
-    }
-
-    Class<? extends FeatureTransformer> featureTransformerCls = (Class<? extends FeatureTransformer>) SerializationUtils
-        .deserialize(featureTransformerBytes);
-
-    Constructor[] constructors = featureTransformerCls
-        .getDeclaredConstructors();
-    Constructor constructor = constructors[0];
-
-    try {
-      this.featureTransformer = (FeatureTransformer) constructor
-          .newInstance(new Object[] {});
-    } catch (InstantiationException e) {
-      e.printStackTrace();
-    } catch (IllegalAccessException e) {
-      e.printStackTrace();
-    } catch (IllegalArgumentException e) {
-      e.printStackTrace();
-    } catch (InvocationTargetException e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    // write model type
-    WritableUtils.writeString(output, modelType);
-    // write learning rate
-    output.writeDouble(learningRate);
-    // write model path
-    if (this.modelPath != null) {
-      WritableUtils.writeString(output, modelPath);
-    } else {
-      WritableUtils.writeString(output, "null");
-    }
-
-    // serialize the class
-    Class<? extends FeatureTransformer> featureTransformerCls = this.featureTransformer
-        .getClass();
-    byte[] featureTransformerBytes = SerializationUtils
-        .serialize(featureTransformerCls);
-    output.writeInt(featureTransformerBytes.length);
-    output.write(featureTransformerBytes);
-  }
-
-  public void setFeatureTransformer(FeatureTransformer featureTransformer) {
-    this.featureTransformer = featureTransformer;
-  }
-
-  public FeatureTransformer getFeatureTransformer() {
-    return this.featureTransformer;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java b/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java
deleted file mode 100644
index 648e86b..0000000
--- a/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.ml.util.DefaultFeatureTransformer;
-import org.apache.hama.ml.util.FeatureTransformer;
-
-/**
- * The trainer that is used to train the {@link SmallLayeredNeuralNetwork} with
- * BSP. The trainer would read the training data and obtain the trained
- * parameters of the model.
- * 
- */
-public abstract class NeuralNetworkTrainer extends
-    BSP<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> {
-
-  protected static final Log LOG = LogFactory
-      .getLog(NeuralNetworkTrainer.class);
-
-  protected Configuration conf;
-  protected int maxIteration;
-  protected int batchSize;
-  protected String trainingMode;
-  
-  protected FeatureTransformer featureTransformer;
-  
-  @Override
-  final public void setup(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
-      throws IOException, SyncException, InterruptedException {
-    conf = peer.getConfiguration();
-    featureTransformer = new DefaultFeatureTransformer();
-    this.extraSetup(peer);
-  }
-
-  /**
-   * Handle extra setup for sub-classes.
-   * 
-   * @param peer
-   * @throws IOException
-   * @throws SyncException
-   * @throws InterruptedException
-   */
-  protected void extraSetup(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
-      throws IOException, SyncException, InterruptedException {
-
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public abstract void bsp(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
-      throws IOException, SyncException, InterruptedException;
-
-  @Override
-  public void cleanup(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
-      throws IOException {
-    this.extraCleanup(peer);
-    // write model to modelPath
-  }
-
-  /**
-   * Handle cleanup for sub-classes. Write the trained model back.
-   * 
-   * @param peer
-   * @throws IOException
-   * @throws SyncException
-   * @throws InterruptedException
-   */
-  protected void extraCleanup(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
-      throws IOException {
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/Neuron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/Neuron.java b/src/main/java/org/apache/horn/bsp/Neuron.java
deleted file mode 100644
index f122b6d..0000000
--- a/src/main/java/org/apache/horn/bsp/Neuron.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.commons.math.DoubleFunction;
-
-public abstract class Neuron<M extends Writable> implements NeuronInterface<M> {
-  double output;
-  double weight;
-  double delta;
-  protected DoubleFunction squashingFunction;
-
-  public void feedforward(double sum) {
-    // TODO Auto-generated method stub
-    // squashing
-    this.output = sum;
-  }
-
-  public void backpropagate(double gradient) {
-    // TODO Auto-generated method stub
-    this.delta = gradient;
-  }
-
-  public double getDelta() {
-    return delta;
-  }
-
-  public void setWeight(double weight) {
-    this.weight = weight;
-  }
-
-  public void setOutput(double output) {
-    this.output = output;
-  }
-
-  public double getOutput() {
-    return output;
-  }
-
-  // ////////* Below methods will communicate with parameter server */
-  private int i;
-
-  public void push(double weight) {
-    weights[i++] = weight;
-  }
-
-  public double getUpdate() {
-    return weight;
-  }
-
-  double[] weights;
-
-  public void setWeightVector(int rowCount) {
-    i = 0;
-    weights = new double[rowCount];
-  }
-
-  public double[] getWeights() {
-    return weights;
-  }
-
-  public void setSquashingFunction(DoubleFunction squashingFunction) {
-    this.squashingFunction = squashingFunction;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/NeuronInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/NeuronInterface.java b/src/main/java/org/apache/horn/bsp/NeuronInterface.java
deleted file mode 100644
index bcc1a5a..0000000
--- a/src/main/java/org/apache/horn/bsp/NeuronInterface.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.HamaConfiguration;
-
-public interface NeuronInterface<M extends Writable> {
-
-  public void setup(HamaConfiguration conf);
-  
-  /**
-   * This method is called when the messages are propagated from the lower
-   * layer. It can be used to determine if the neuron would activate, or fire.
-   * 
-   * @param messages
-   * @throws IOException
-   */
-  public void forward(Iterable<M> messages) throws IOException;
-
-  /**
-   * This method is called when the errors are propagated from the upper layer.
-   * It can be used to calculate the error of each neuron and change the
-   * weights.
-   * 
-   * @param messages
-   * @throws IOException
-   */
-  public void backward(Iterable<M> messages) throws IOException;
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/ParameterMerger.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/ParameterMerger.java b/src/main/java/org/apache/horn/bsp/ParameterMerger.java
deleted file mode 100644
index 6df719a..0000000
--- a/src/main/java/org/apache/horn/bsp/ParameterMerger.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import org.apache.hama.ipc.VersionedProtocol;
-
-public interface ParameterMerger extends VersionedProtocol {
-  long versionID = 1L;
-
-  SmallLayeredNeuralNetworkMessage merge(SmallLayeredNeuralNetworkMessage msg);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java b/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java
deleted file mode 100644
index 47aab84..0000000
--- a/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hama.commons.math.DoubleMatrix;
-
-import com.google.common.base.Preconditions;
-
-public class ParameterMergerServer implements ParameterMerger {
-
-  private static final Log LOG = LogFactory.getLog(ParameterMergerServer.class);
-
-  /* The parameter merge base. */
-  protected SmallLayeredNeuralNetwork inMemoryModel;
-
-  /* To terminate or not to terminate. */
-  protected AtomicBoolean isConverge;
-
-  /* The number of slave works that request commits. */
-  protected int SlaveCount;
-
-  /* After mergeLimit, terminate whether the result is converging or not. */
-  protected int mergeLimit;
-
-  /*
-   * last n training errors. converging is decided based on the average value of
-   * these errors.
-   */
-  protected double[] trainingErrors;
-
-  /*
-   * If the average of last n training errors is smaller than this value, it is
-   * converging.
-   */
-  protected double prevAvgTrainingError = Double.MAX_VALUE;
-
-  /* current index for trainingErrors. */
-  protected int curTrainingError = 0;
-
-  /* how many merges have been conducted? */
-  protected int mergeCount = 0;
-
-  public ParameterMergerServer(SmallLayeredNeuralNetwork inMemoryModel,
-      AtomicBoolean isConverge, int slaveCount, int mergeLimit,
-      int convergenceCheckInterval) {
-    this.inMemoryModel = inMemoryModel;
-    this.isConverge = isConverge;
-    this.SlaveCount = slaveCount;
-    this.mergeLimit = mergeLimit;
-    this.trainingErrors = new double[convergenceCheckInterval];
-  }
-
-  @Override
-  public long getProtocolVersion(String s, long l) throws IOException {
-    return ParameterMerger.versionID;
-  }
-
-  @Override
-  public SmallLayeredNeuralNetworkMessage merge(
-      SmallLayeredNeuralNetworkMessage msg) {
-
-    double trainingError = msg.getTrainingError();
-    DoubleMatrix[] weightUpdates = msg.getCurMatrices();
-    DoubleMatrix[] prevWeightUpdates = msg.getPrevMatrices();
-
-    Preconditions
-        .checkArgument(weightUpdates.length == prevWeightUpdates.length);
-
-    LOG.info("Start merging: " + this.mergeCount);
-
-    if (!this.isConverge.get()) {
-      for (int i = 0; i < weightUpdates.length; ++i) {
-        weightUpdates[i] = weightUpdates[i].divide(this.SlaveCount);
-        prevWeightUpdates[i] = prevWeightUpdates[i].divide(this.SlaveCount);
-      }
-
-      synchronized (inMemoryModel) {
-        this.inMemoryModel.updateWeightMatrices(weightUpdates);
-        this.inMemoryModel.setPrevWeightMatrices(prevWeightUpdates);
-
-        // add trainingError to trainingErrors
-        this.trainingErrors[this.curTrainingError++] = trainingError;
-
-        // check convergence
-        if (this.trainingErrors.length == this.curTrainingError) {
-          double curAvgTrainingError = 0.0;
-          for (int i = 0; i < this.curTrainingError; ++i) {
-            curAvgTrainingError += this.trainingErrors[i];
-          }
-          curAvgTrainingError /= this.trainingErrors.length;
-
-          if (prevAvgTrainingError < curAvgTrainingError) {
-            this.isConverge.set(true);
-          } else {
-            // update
-            prevAvgTrainingError = curAvgTrainingError;
-            this.curTrainingError = 0;
-          }
-        }
-
-        if (++this.mergeCount == this.mergeLimit) {
-          this.isConverge.set(true);
-        }
-      }
-    }
-
-    return new SmallLayeredNeuralNetworkMessage(0, this.isConverge.get(),
-        this.inMemoryModel.getWeightMatrices(),
-        this.inMemoryModel.getPrevMatricesUpdates());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
deleted file mode 100644
index 0ea8e51..0000000
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
+++ /dev/null
@@ -1,620 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.math.RandomUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.commons.io.MatrixWritable;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleFunction;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.util.ReflectionUtils;
-import org.apache.horn.examples.MultiLayerPerceptron.StandardNeuron;
-import org.apache.horn.funcs.FunctionFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * SmallLayeredNeuralNetwork defines the general operations for derivative
- * layered models, include Linear Regression, Logistic Regression, Multilayer
- * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc. For
- * SmallLayeredNeuralNetwork, the training can be conducted in parallel, but the
- * parameters of the models are assumes to be stored in a single machine.
- * 
- * In general, these models consist of neurons which are aligned in layers.
- * Between layers, for any two adjacent layers, the neurons are connected to
- * form a bipartite weighted graph.
- * 
- */
-public class SmallLayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
-
-  private static final Log LOG = LogFactory
-      .getLog(SmallLayeredNeuralNetwork.class);
-
-  public static Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>> neuronClass;
-
-  /* Weights between neurons at adjacent layers */
-  protected List<DoubleMatrix> weightMatrixList;
-
-  /* Previous weight updates between neurons at adjacent layers */
-  protected List<DoubleMatrix> prevWeightUpdatesList;
-
-  /* Different layers can have different squashing function */
-  protected List<DoubleFunction> squashingFunctionList;
-
-  protected int finalLayerIdx;
-
-  protected double regularizationWeight;
-
-  public SmallLayeredNeuralNetwork() {
-    this.layerSizeList = Lists.newArrayList();
-    this.weightMatrixList = Lists.newArrayList();
-    this.prevWeightUpdatesList = Lists.newArrayList();
-    this.squashingFunctionList = Lists.newArrayList();
-  }
-
-  public SmallLayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
-    super(conf, modelPath);
-    this.regularizationWeight = conf.getDouble("regularization.weight", 0);
-  }
-
-  @Override
-  /**
-   * {@inheritDoc}
-   */
-  public int addLayer(int size, boolean isFinalLayer,
-      DoubleFunction squashingFunction) {
-    Preconditions.checkArgument(size > 0,
-        "Size of layer must be larger than 0.");
-    if (!isFinalLayer) {
-      size += 1;
-    }
-
-    LOG.info("Add Layer: " + size);
-    this.layerSizeList.add(size);
-    int layerIdx = this.layerSizeList.size() - 1;
-    if (isFinalLayer) {
-      this.finalLayerIdx = layerIdx;
-    }
-
-    // add weights between current layer and previous layer, and input layer has
-    // no squashing function
-    if (layerIdx > 0) {
-      int sizePrevLayer = this.layerSizeList.get(layerIdx - 1);
-      // row count equals to size of current size and column count equals to
-      // size of previous layer
-      int row = isFinalLayer ? size : size - 1;
-      int col = sizePrevLayer;
-      DoubleMatrix weightMatrix = new DenseDoubleMatrix(row, col);
-      // initialize weights
-      weightMatrix.applyToElements(new DoubleFunction() {
-        @Override
-        public double apply(double value) {
-          return RandomUtils.nextDouble() - 0.5;
-        }
-
-        @Override
-        public double applyDerivative(double value) {
-          throw new UnsupportedOperationException("");
-        }
-      });
-      this.weightMatrixList.add(weightMatrix);
-      this.prevWeightUpdatesList.add(new DenseDoubleMatrix(row, col));
-      this.squashingFunctionList.add(squashingFunction);
-    }
-    return layerIdx;
-  }
-
-  /**
-   * Update the weight matrices with given matrices.
-   * 
-   * @param matrices
-   */
-  public void updateWeightMatrices(DoubleMatrix[] matrices) {
-    for (int i = 0; i < matrices.length; ++i) {
-      DoubleMatrix matrix = this.weightMatrixList.get(i);
-      this.weightMatrixList.set(i, matrix.add(matrices[i]));
-    }
-  }
-
-  /**
-   * Set the previous weight matrices.
-   * 
-   * @param prevUpdates
-   */
-  void setPrevWeightMatrices(DoubleMatrix[] prevUpdates) {
-    this.prevWeightUpdatesList.clear();
-    Collections.addAll(this.prevWeightUpdatesList, prevUpdates);
-  }
-
-  /**
-   * Add a batch of matrices onto the given destination matrices.
-   * 
-   * @param destMatrices
-   * @param sourceMatrices
-   */
-  static void matricesAdd(DoubleMatrix[] destMatrices,
-      DoubleMatrix[] sourceMatrices) {
-    for (int i = 0; i < destMatrices.length; ++i) {
-      destMatrices[i] = destMatrices[i].add(sourceMatrices[i]);
-    }
-  }
-
-  /**
-   * Get all the weight matrices.
-   * 
-   * @return The matrices in form of matrix array.
-   */
-  DoubleMatrix[] getWeightMatrices() {
-    DoubleMatrix[] matrices = new DoubleMatrix[this.weightMatrixList.size()];
-    this.weightMatrixList.toArray(matrices);
-    return matrices;
-  }
-
-  /**
-   * Set the weight matrices.
-   * 
-   * @param matrices
-   */
-  public void setWeightMatrices(DoubleMatrix[] matrices) {
-    this.weightMatrixList = new ArrayList<DoubleMatrix>();
-    Collections.addAll(this.weightMatrixList, matrices);
-  }
-
-  /**
-   * Get the previous matrices updates in form of array.
-   * 
-   * @return The matrices in form of matrix array.
-   */
-  public DoubleMatrix[] getPrevMatricesUpdates() {
-    DoubleMatrix[] prevMatricesUpdates = new DoubleMatrix[this.prevWeightUpdatesList
-        .size()];
-    for (int i = 0; i < this.prevWeightUpdatesList.size(); ++i) {
-      prevMatricesUpdates[i] = this.prevWeightUpdatesList.get(i);
-    }
-    return prevMatricesUpdates;
-  }
-
-  public void setWeightMatrix(int index, DoubleMatrix matrix) {
-    Preconditions.checkArgument(
-        0 <= index && index < this.weightMatrixList.size(), String.format(
-            "index [%d] should be in range[%d, %d].", index, 0,
-            this.weightMatrixList.size()));
-    this.weightMatrixList.set(index, matrix);
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    super.readFields(input);
-
-    // read squash functions
-    int squashingFunctionSize = input.readInt();
-    this.squashingFunctionList = Lists.newArrayList();
-    for (int i = 0; i < squashingFunctionSize; ++i) {
-      this.squashingFunctionList.add(FunctionFactory
-          .createDoubleFunction(WritableUtils.readString(input)));
-    }
-
-    // read weights and construct matrices of previous updates
-    int numOfMatrices = input.readInt();
-    this.weightMatrixList = Lists.newArrayList();
-    this.prevWeightUpdatesList = Lists.newArrayList();
-    for (int i = 0; i < numOfMatrices; ++i) {
-      DoubleMatrix matrix = MatrixWritable.read(input);
-      this.weightMatrixList.add(matrix);
-      this.prevWeightUpdatesList.add(new DenseDoubleMatrix(
-          matrix.getRowCount(), matrix.getColumnCount()));
-    }
-
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    super.write(output);
-
-    // write squashing functions
-    output.writeInt(this.squashingFunctionList.size());
-    for (DoubleFunction aSquashingFunctionList : this.squashingFunctionList) {
-      WritableUtils.writeString(output,
-          aSquashingFunctionList.getFunctionName());
-    }
-
-    // write weight matrices
-    output.writeInt(this.weightMatrixList.size());
-    for (DoubleMatrix aWeightMatrixList : this.weightMatrixList) {
-      MatrixWritable.write(aWeightMatrixList, output);
-    }
-
-    // DO NOT WRITE WEIGHT UPDATE
-  }
-
-  @Override
-  public DoubleMatrix getWeightsByLayer(int layerIdx) {
-    return this.weightMatrixList.get(layerIdx);
-  }
-
-  /**
-   * Get the output of the model according to given feature instance.
-   */
-  @Override
-  public DoubleVector getOutput(DoubleVector instance) {
-    Preconditions.checkArgument(this.layerSizeList.get(0) - 1 == instance
-        .getDimension(), String.format(
-        "The dimension of input instance should be %d.",
-        this.layerSizeList.get(0) - 1));
-    // transform the features to another space
-    DoubleVector transformedInstance = this.featureTransformer
-        .transform(instance);
-    // add bias feature
-    DoubleVector instanceWithBias = new DenseDoubleVector(
-        transformedInstance.getDimension() + 1);
-    instanceWithBias.set(0, 0.99999); // set bias to be a little bit less than
-                                      // 1.0
-    for (int i = 1; i < instanceWithBias.getDimension(); ++i) {
-      instanceWithBias.set(i, transformedInstance.get(i - 1));
-    }
-
-    List<DoubleVector> outputCache = getOutputInternal(instanceWithBias);
-    // return the output of the last layer
-    DoubleVector result = outputCache.get(outputCache.size() - 1);
-    // remove bias
-    return result.sliceUnsafe(1, result.getDimension() - 1);
-  }
-
-  /**
-   * Calculate output internally, the intermediate output of each layer will be
-   * stored.
-   * 
-   * @param instanceWithBias The instance contains the features.
-   * @return Cached output of each layer.
-   */
-  public List<DoubleVector> getOutputInternal(DoubleVector instanceWithBias) {
-    List<DoubleVector> outputCache = new ArrayList<DoubleVector>();
-    // fill with instance
-    DoubleVector intermediateOutput = instanceWithBias;
-    outputCache.add(intermediateOutput);
-
-    for (int i = 0; i < this.layerSizeList.size() - 1; ++i) {
-      intermediateOutput = forward(i, intermediateOutput);
-      outputCache.add(intermediateOutput);
-    }
-
-    return outputCache;
-  }
-
-  /**
-   * @return a new neuron instance
-   */
-  public static Neuron<Synapse<DoubleWritable, DoubleWritable>> newNeuronInstance() {
-    return (Neuron<Synapse<DoubleWritable, DoubleWritable>>) ReflectionUtils
-        .newInstance(neuronClass);
-  }
-
-  /**
-   * Forward the calculation for one layer.
-   * 
-   * @param fromLayer The index of the previous layer.
-   * @param intermediateOutput The intermediateOutput of previous layer.
-   * @return a new vector with the result of the operation.
-   */
-  protected DoubleVector forward(int fromLayer, DoubleVector intermediateOutput) {
-    DoubleMatrix weightMatrix = this.weightMatrixList.get(fromLayer);
-
-    neuronClass = (Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>>) conf
-        .getClass("neuron.class", Neuron.class);
-
-    // TODO use the multithread processing
-    DoubleVector vec = new DenseDoubleVector(weightMatrix.getRowCount());
-    for (int row = 0; row < weightMatrix.getRowCount(); row++) {
-      List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
-      for (int col = 0; col < weightMatrix.getColumnCount(); col++) {
-        msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
-            new DoubleWritable(intermediateOutput.get(col)),
-            new DoubleWritable(weightMatrix.get(row, col))));
-      }
-      Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
-      Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
-      n.setup(conf);
-      n.setSquashingFunction(this.squashingFunctionList.get(fromLayer));
-      try {
-        n.forward(iterable);
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-      vec.set(row, n.getOutput());
-    }
-
-    // add bias
-    DoubleVector vecWithBias = new DenseDoubleVector(vec.getDimension() + 1);
-    vecWithBias.set(0, 1);
-    for (int i = 0; i < vec.getDimension(); ++i) {
-      vecWithBias.set(i + 1, vec.get(i));
-    }
-
-    return vecWithBias;
-  }
-
-  /**
-   * Train the model online.
-   * 
-   * @param trainingInstance
-   */
-  public void trainOnline(DoubleVector trainingInstance) {
-    DoubleMatrix[] updateMatrices = this.trainByInstance(trainingInstance);
-    this.updateWeightMatrices(updateMatrices);
-  }
-
-  @Override
-  public DoubleMatrix[] trainByInstance(DoubleVector trainingInstance) {
-    DoubleVector transformedVector = this.featureTransformer
-        .transform(trainingInstance.sliceUnsafe(this.layerSizeList.get(0) - 1));
-
-    int inputDimension = this.layerSizeList.get(0) - 1;
-    int outputDimension;
-    DoubleVector inputInstance = null;
-    DoubleVector labels = null;
-    if (this.learningStyle == LearningStyle.SUPERVISED) {
-      outputDimension = this.layerSizeList.get(this.layerSizeList.size() - 1);
-      // validate training instance
-      Preconditions.checkArgument(
-          inputDimension + outputDimension == trainingInstance.getDimension(),
-          String
-              .format(
-                  "The dimension of training instance is %d, but requires %d.",
-                  trainingInstance.getDimension(), inputDimension
-                      + outputDimension));
-
-      inputInstance = new DenseDoubleVector(this.layerSizeList.get(0));
-      inputInstance.set(0, 1); // add bias
-      // get the features from the transformed vector
-      for (int i = 0; i < inputDimension; ++i) {
-        inputInstance.set(i + 1, transformedVector.get(i));
-      }
-      // get the labels from the original training instance
-      labels = trainingInstance.sliceUnsafe(inputInstance.getDimension() - 1,
-          trainingInstance.getDimension() - 1);
-    } else if (this.learningStyle == LearningStyle.UNSUPERVISED) {
-      // labels are identical to input features
-      outputDimension = inputDimension;
-      // validate training instance
-      Preconditions.checkArgument(inputDimension == trainingInstance
-          .getDimension(), String.format(
-          "The dimension of training instance is %d, but requires %d.",
-          trainingInstance.getDimension(), inputDimension));
-
-      inputInstance = new DenseDoubleVector(this.layerSizeList.get(0));
-      inputInstance.set(0, 1); // add bias
-      // get the features from the transformed vector
-      for (int i = 0; i < inputDimension; ++i) {
-        inputInstance.set(i + 1, transformedVector.get(i));
-      }
-      // get the labels by copying the transformed vector
-      labels = transformedVector.deepCopy();
-    }
-
-    List<DoubleVector> internalResults = this.getOutputInternal(inputInstance);
-    DoubleVector output = internalResults.get(internalResults.size() - 1);
-
-    // get the training error
-    calculateTrainingError(labels,
-        output.deepCopy().sliceUnsafe(1, output.getDimension() - 1));
-
-    if (this.trainingMethod.equals(TrainingMethod.GRADIENT_DESCENT)) {
-      return this.trainByInstanceGradientDescent(labels, internalResults);
-    } else {
-      throw new IllegalArgumentException(
-          String.format("Training method is not supported."));
-    }
-  }
-
-  /**
-   * Train by gradient descent. Get the updated weights using one training
-   * instance.
-   * 
-   * @param trainingInstance
-   * @return The weight update matrices.
-   */
-  private DoubleMatrix[] trainByInstanceGradientDescent(DoubleVector labels,
-      List<DoubleVector> internalResults) {
-
-    DoubleVector output = internalResults.get(internalResults.size() - 1);
-    // initialize weight update matrices
-    DenseDoubleMatrix[] weightUpdateMatrices = new DenseDoubleMatrix[this.weightMatrixList
-        .size()];
-    for (int m = 0; m < weightUpdateMatrices.length; ++m) {
-      weightUpdateMatrices[m] = new DenseDoubleMatrix(this.weightMatrixList
-          .get(m).getRowCount(), this.weightMatrixList.get(m).getColumnCount());
-    }
-    DoubleVector deltaVec = new DenseDoubleVector(
-        this.layerSizeList.get(this.layerSizeList.size() - 1));
-
-    DoubleFunction squashingFunction = this.squashingFunctionList
-        .get(this.squashingFunctionList.size() - 1);
-
-    DoubleMatrix lastWeightMatrix = this.weightMatrixList
-        .get(this.weightMatrixList.size() - 1);
-    for (int i = 0; i < deltaVec.getDimension(); ++i) {
-      double costFuncDerivative = this.costFunction.applyDerivative(
-          labels.get(i), output.get(i + 1));
-      // add regularization
-      costFuncDerivative += this.regularizationWeight
-          * lastWeightMatrix.getRowVector(i).sum();
-      deltaVec.set(
-          i,
-          costFuncDerivative
-              * squashingFunction.applyDerivative(output.get(i + 1)));
-    }
-
-    // start from previous layer of output layer
-    for (int layer = this.layerSizeList.size() - 2; layer >= 0; --layer) {
-      output = internalResults.get(layer);
-      deltaVec = backpropagate(layer, deltaVec, internalResults,
-          weightUpdateMatrices[layer]);
-    }
-
-    this.setPrevWeightMatrices(weightUpdateMatrices);
-
-    return weightUpdateMatrices;
-  }
-
-  /**
-   * Back-propagate the errors to from next layer to current layer. The weight
-   * updated information will be stored in the weightUpdateMatrices, and the
-   * delta of the prevLayer would be returned.
-   * 
-   * @param layer Index of current layer.
-   * @param internalOutput Internal output of current layer.
-   * @param deltaVec Delta of next layer.
-   * @return the squashing function of the specified position.
-   */
-  private DoubleVector backpropagate(int curLayerIdx,
-      DoubleVector nextLayerDelta, List<DoubleVector> outputCache,
-      DenseDoubleMatrix weightUpdateMatrix) {
-
-    // get layer related information
-    DoubleVector curLayerOutput = outputCache.get(curLayerIdx);
-    DoubleMatrix weightMatrix = this.weightMatrixList.get(curLayerIdx);
-    DoubleMatrix prevWeightMatrix = this.prevWeightUpdatesList.get(curLayerIdx);
-
-    // next layer is not output layer, remove the delta of bias neuron
-    if (curLayerIdx != this.layerSizeList.size() - 2) {
-      nextLayerDelta = nextLayerDelta.slice(1,
-          nextLayerDelta.getDimension() - 1);
-    }
-
-    // DoubleMatrix transposed = weightMatrix.transpose();
-    DoubleVector deltaVector = new DenseDoubleVector(
-        weightMatrix.getColumnCount());
-    for (int row = 0; row < weightMatrix.getColumnCount(); ++row) {
-      Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
-      // calls setup method
-      n.setup(conf);
-      n.setSquashingFunction(this.squashingFunctionList.get(curLayerIdx));
-      n.setOutput(curLayerOutput.get(row));
-
-      List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
-
-      n.setWeightVector(weightMatrix.getRowCount());
-
-      for (int col = 0; col < weightMatrix.getRowCount(); ++col) {
-        // sum += (transposed.get(row, col) * nextLayerDelta.get(col));
-        msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
-            new DoubleWritable(nextLayerDelta.get(col)), new DoubleWritable(
-                weightMatrix.get(col, row)), new DoubleWritable(
-                prevWeightMatrix.get(col, row))));
-      }
-
-      Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
-      try {
-        n.backward(iterable);
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-
-      // update weights
-      weightUpdateMatrix.setColumn(row, n.getWeights());
-      deltaVector.set(row, n.getDelta());
-    }
-
-    return deltaVector;
-  }
-
-  @Override
-  protected BSPJob trainInternal(HamaConfiguration hamaConf)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    this.conf = hamaConf;
-    this.fs = FileSystem.get(conf);
-
-    String modelPath = conf.get("model.path");
-    if (modelPath != null) {
-      this.modelPath = modelPath;
-    }
-    // modelPath must be set before training
-    if (this.modelPath == null) {
-      throw new IllegalArgumentException(
-          "Please specify the modelPath for model, "
-              + "either through setModelPath() or add 'modelPath' to the training parameters.");
-    }
-    this.writeModelToFile();
-
-    // create job
-    BSPJob job = new BSPJob(conf, SmallLayeredNeuralNetworkTrainer.class);
-    job.setJobName("Small scale Neural Network training");
-    job.setJarByClass(SmallLayeredNeuralNetworkTrainer.class);
-    job.setBspClass(SmallLayeredNeuralNetworkTrainer.class);
-
-    job.getConfiguration().setClass("neuron.class", StandardNeuron.class,
-        Neuron.class);
-
-    // additional for parameter server
-    // TODO at this moment, we use 1 task as a parameter server
-    // In the future, the number of parameter server should be configurable
-    job.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
-
-    job.setInputPath(new Path(conf.get("training.input.path")));
-    job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
-    job.setInputKeyClass(LongWritable.class);
-    job.setInputValueClass(VectorWritable.class);
-    job.setOutputKeyClass(NullWritable.class);
-    job.setOutputValueClass(NullWritable.class);
-    job.setOutputFormat(org.apache.hama.bsp.NullOutputFormat.class);
-
-    return job;
-  }
-
-  @Override
-  protected void calculateTrainingError(DoubleVector labels, DoubleVector output) {
-    DoubleVector errors = labels.deepCopy().applyToElements(output,
-        this.costFunction);
-    this.trainingError = errors.sum();
-  }
-
-  /**
-   * Get the squashing function of a specified layer.
-   * 
-   * @param idx
-   * @return a new vector with the result of the operation.
-   */
-  public DoubleFunction getSquashingFunction(int idx) {
-    return this.squashingFunctionList.get(idx);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java
deleted file mode 100644
index 2f8c287..0000000
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.commons.io.MatrixWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DoubleMatrix;
-
-/**
- * NeuralNetworkMessage transmits the messages between peers during the training
- * of neural networks.
- * 
- */
-public class SmallLayeredNeuralNetworkMessage implements Writable {
-
-  protected double trainingError;
-  protected DoubleMatrix[] curMatrices;
-  protected DoubleMatrix[] prevMatrices;
-  protected boolean converge;
-
-  public SmallLayeredNeuralNetworkMessage() {
-  }
-  
-  public SmallLayeredNeuralNetworkMessage(double trainingError,
-      boolean converge, DoubleMatrix[] weightMatrices,
-      DoubleMatrix[] prevMatrices) {
-    this.trainingError = trainingError;
-    this.converge = converge;
-    this.curMatrices = weightMatrices;
-    this.prevMatrices = prevMatrices;
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    trainingError = input.readDouble();
-    converge = input.readBoolean();
-    int numMatrices = input.readInt();
-    boolean hasPrevMatrices = input.readBoolean();
-    curMatrices = new DenseDoubleMatrix[numMatrices];
-    // read matrice updates
-    for (int i = 0; i < curMatrices.length; ++i) {
-      curMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
-    }
-
-    if (hasPrevMatrices) {
-      prevMatrices = new DenseDoubleMatrix[numMatrices];
-      // read previous matrices updates
-      for (int i = 0; i < prevMatrices.length; ++i) {
-        prevMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
-      }
-    }
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    output.writeDouble(trainingError);
-    output.writeBoolean(converge);
-    output.writeInt(curMatrices.length);
-    if (prevMatrices == null) {
-      output.writeBoolean(false);
-    } else {
-      output.writeBoolean(true);
-    }
-    for (DoubleMatrix matrix : curMatrices) {
-      MatrixWritable.write(matrix, output);
-    }
-    if (prevMatrices != null) {
-      for (DoubleMatrix matrix : prevMatrices) {
-        MatrixWritable.write(matrix, output);
-      }
-    }
-  }
-
-  public double getTrainingError() {
-    return trainingError;
-  }
-
-  public void setTrainingError(double trainingError) {
-    this.trainingError = trainingError;
-  }
-
-  public boolean isConverge() {
-    return converge;
-  }
-
-  public void setConverge(boolean converge) {
-    this.converge = converge;
-  }
-
-  public DoubleMatrix[] getCurMatrices() {
-    return curMatrices;
-  }
-
-  public void setMatrices(DoubleMatrix[] curMatrices) {
-    this.curMatrices = curMatrices;
-  }
-
-  public DoubleMatrix[] getPrevMatrices() {
-    return prevMatrices;
-  }
-
-  public void setPrevMatrices(DoubleMatrix[] prevMatrices) {
-    this.prevMatrices = prevMatrices;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
deleted file mode 100644
index c3e258c..0000000
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.ipc.RPC;
-
-import com.google.common.base.Preconditions;
-
-/**
- * The trainer that train the {@link SmallLayeredNeuralNetwork} based on BSP
- * framework.
- * 
- */
-public final class SmallLayeredNeuralNetworkTrainer
-    extends
-    BSP<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> {
-
-  private static final Log LOG = LogFactory
-      .getLog(SmallLayeredNeuralNetworkTrainer.class);
-
-  /* When given peer is master worker: base of parameter merge */
-  /* When given peer is slave worker: neural network for training */
-  private SmallLayeredNeuralNetwork inMemoryModel;
-
-  /* Job configuration */
-  private HamaConfiguration conf;
-
-  /* Default batch size */
-  private int batchSize;
-
-  /* whether it is converging or not */
-  private AtomicBoolean isConverge;
-
-  /* When given peer is master worker: Asynchronous parameter merger */
-  /* When given peer is slave worker: null */
-  private RPC.Server merger;
-
-  /* When given peer is master worker: null */
-  /* When given peer is slave worker: proxy to Asynchronous parameter merger */
-  private ParameterMerger proxy;
-
-  /**
-   * Returns true if this worker is master worker.
-   *
-   * @param peer
-   * */
-  private boolean isMaster(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
-    return peer.getPeerIndex() == peer.getNumPeers() - 1;
-  }
-
-  @Override
-  /**
-   * If the model path is specified, load the existing from storage location.
-   */
-  public void setup(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
-    // At least one master & slave worker exist.
-    Preconditions.checkArgument(peer.getNumPeers() >= 2);
-    this.conf = peer.getConfiguration();
-
-    String modelPath = conf.get("model.path");
-    this.inMemoryModel = new SmallLayeredNeuralNetwork(conf, modelPath);
-
-    this.batchSize = conf.getInt("training.batch.size", 50);
-    this.isConverge = new AtomicBoolean(false);
-
-    int slaveCount = peer.getNumPeers() - 1;
-    int mergeLimit = conf.getInt("training.max.iterations", 100000);
-    int convergenceCheckInterval = peer.getNumPeers()
-        * conf.getInt("convergence.check.interval", 2000);
-    String master = peer.getPeerName();
-    String masterAddr = master.substring(0, master.indexOf(':'));
-    int port = conf.getInt("sync.server.port", 40052);
-
-    if (isMaster(peer)) {
-      try {
-        this.merger = RPC.getServer(new ParameterMergerServer(inMemoryModel,
-            isConverge, slaveCount, mergeLimit, convergenceCheckInterval),
-            masterAddr, port, conf);
-        merger.start();
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-      LOG.info("Begin to train");
-    } else {
-      InetSocketAddress addr = new InetSocketAddress(masterAddr, port);
-      try {
-        this.proxy = (ParameterMerger) RPC.getProxy(ParameterMerger.class,
-            ParameterMerger.versionID, addr, conf);
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  @Override
-  /**
-   * Write the trained model back to stored location.
-   */
-  public void cleanup(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
-    // write model to modelPath
-    if (isMaster(peer)) {
-      try {
-        LOG.info("Write model back to " + inMemoryModel.getModelPath());
-        this.inMemoryModel.writeModelToFile();
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  @Override
-  public void bsp(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer)
-      throws IOException, SyncException, InterruptedException {
-    while (!this.isConverge.get()) {
-      // each slave-worker calculate the matrices updates according to local
-      // data
-      // and merge them with master
-      if (!isMaster(peer)) {
-        calculateUpdates(peer);
-      }
-    }
-
-    if (isMaster(peer)) {
-      merger.stop();
-    }
-    peer.sync(); // finalize the bsp program.
-  }
-
-  /**
-   * Calculate the matrices updates according to local partition of data.
-   * 
-   * @param peer
-   * @throws IOException
-   */
-  private void calculateUpdates(
-      BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer)
-      throws IOException {
-
-    DoubleMatrix[] weightUpdates = new DoubleMatrix[this.inMemoryModel.weightMatrixList
-        .size()];
-    for (int i = 0; i < weightUpdates.length; ++i) {
-      int row = this.inMemoryModel.weightMatrixList.get(i).getRowCount();
-      int col = this.inMemoryModel.weightMatrixList.get(i).getColumnCount();
-      weightUpdates[i] = new DenseDoubleMatrix(row, col);
-    }
-
-    // continue to train
-    double avgTrainingError = 0.0;
-    LongWritable key = new LongWritable();
-    VectorWritable value = new VectorWritable();
-    for (int recordsRead = 0; recordsRead < batchSize; ++recordsRead) {
-      if (!peer.readNext(key, value)) {
-        peer.reopenInput();
-        peer.readNext(key, value);
-      }
-      DoubleVector trainingInstance = value.getVector();
-      SmallLayeredNeuralNetwork.matricesAdd(weightUpdates,
-          this.inMemoryModel.trainByInstance(trainingInstance));
-      avgTrainingError += this.inMemoryModel.trainingError;
-    }
-    avgTrainingError /= batchSize;
-
-    // calculate the average of updates
-    for (int i = 0; i < weightUpdates.length; ++i) {
-      weightUpdates[i] = weightUpdates[i].divide(batchSize);
-    }
-
-    // exchange parameter update with master
-    SmallLayeredNeuralNetworkMessage msg = new SmallLayeredNeuralNetworkMessage(
-        avgTrainingError, false, weightUpdates,
-        this.inMemoryModel.getPrevMatricesUpdates());
-
-    SmallLayeredNeuralNetworkMessage inMessage = proxy.merge(msg);
-    DoubleMatrix[] newWeights = inMessage.getCurMatrices();
-    DoubleMatrix[] preWeightUpdates = inMessage.getPrevMatrices();
-    this.inMemoryModel.setWeightMatrices(newWeights);
-    this.inMemoryModel.setPrevWeightMatrices(preWeightUpdates);
-    this.isConverge.set(inMessage.isConverge());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/Synapse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/Synapse.java b/src/main/java/org/apache/horn/bsp/Synapse.java
deleted file mode 100644
index 61725f9..0000000
--- a/src/main/java/org/apache/horn/bsp/Synapse.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Message wrapper for a propagating message
- */
-public class Synapse<M extends Writable, W extends Writable> implements
-    Writable {
-
-  DoubleWritable message;
-  DoubleWritable weight;
-  DoubleWritable prevWeight;
-
-  public Synapse(DoubleWritable message, DoubleWritable weight) {
-    this.message = message;
-    this.weight = weight;
-  }
-
-  public Synapse(DoubleWritable message, DoubleWritable weight, DoubleWritable prevWeight) {
-    this.message = message;
-    this.weight = weight;
-    this.prevWeight = prevWeight;
-  }
-  
-  /**
-   * @return the activation or error message
-   */
-  public double getMessage() {
-    return message.get();
-  }
-
-  public double getInput() {
-    // returns the input
-    return message.get();
-  }
-  
-  public double getDelta() {
-    // returns the delta
-    return message.get();
-  }
-  
-  public double getWeight() {
-    return weight.get();
-  }
-  
-  public double getPrevWeight() {
-    return prevWeight.get();
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    message.readFields(in);
-    weight.readFields(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    message.write(out);
-    weight.write(out);
-  }
-
-}
\ No newline at end of file