You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@horn.apache.org by ed...@apache.org on 2016/04/26 05:46:25 UTC
[4/4] incubator-horn git commit: Code refactoring
Code refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-horn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-horn/commit/ac8eaf8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-horn/tree/ac8eaf8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-horn/diff/ac8eaf8e
Branch: refs/heads/master
Commit: ac8eaf8e179b147777e7f1b9312045a42df86373
Parents: 277773c
Author: Edward J. Yoon <ed...@apache.org>
Authored: Tue Apr 26 12:30:53 2016 +0900
Committer: Edward J. Yoon <ed...@apache.org>
Committed: Tue Apr 26 12:42:47 2016 +0900
----------------------------------------------------------------------
.../horn/bsp/AbstractLayeredNeuralNetwork.java | 221 -------
.../java/org/apache/horn/bsp/AutoEncoder.java | 197 ------
src/main/java/org/apache/horn/bsp/HornJob.java | 109 ----
.../java/org/apache/horn/bsp/NeuralNetwork.java | 237 -------
.../apache/horn/bsp/NeuralNetworkTrainer.java | 106 ---
src/main/java/org/apache/horn/bsp/Neuron.java | 82 ---
.../org/apache/horn/bsp/NeuronInterface.java | 48 --
.../org/apache/horn/bsp/ParameterMerger.java | 27 -
.../apache/horn/bsp/ParameterMergerServer.java | 132 ----
.../horn/bsp/SmallLayeredNeuralNetwork.java | 620 ------------------
.../bsp/SmallLayeredNeuralNetworkMessage.java | 126 ----
.../bsp/SmallLayeredNeuralNetworkTrainer.java | 216 -------
src/main/java/org/apache/horn/bsp/Synapse.java | 85 ---
.../horn/core/AbstractLayeredNeuralNetwork.java | 221 +++++++
.../apache/horn/core/AbstractNeuralNetwork.java | 237 +++++++
.../horn/core/AbstractNeuralNetworkTrainer.java | 108 ++++
.../java/org/apache/horn/core/AutoEncoder.java | 197 ++++++
src/main/java/org/apache/horn/core/HornJob.java | 109 ++++
.../apache/horn/core/LayeredNeuralNetwork.java | 621 ++++++++++++++++++
.../horn/core/LayeredNeuralNetworkTrainer.java | 216 +++++++
src/main/java/org/apache/horn/core/Neuron.java | 82 +++
.../org/apache/horn/core/NeuronInterface.java | 48 ++
.../org/apache/horn/core/ParameterMerger.java | 27 +
.../apache/horn/core/ParameterMergerServer.java | 132 ++++
.../org/apache/horn/core/ParameterMessage.java | 125 ++++
src/main/java/org/apache/horn/core/Synapse.java | 85 +++
.../horn/examples/MultiLayerPerceptron.java | 8 +-
.../org/apache/horn/examples/NeuralNetwork.java | 217 -------
.../java/org/apache/horn/bsp/MLTestBase.java | 64 --
.../org/apache/horn/bsp/TestAutoEncoder.java | 202 ------
.../horn/bsp/TestSmallLayeredNeuralNetwork.java | 642 -------------------
.../TestSmallLayeredNeuralNetworkMessage.java | 172 -----
.../java/org/apache/horn/core/MLTestBase.java | 64 ++
.../org/apache/horn/core/TestAutoEncoder.java | 203 ++++++
.../java/org/apache/horn/core/TestNeuron.java | 93 +++
.../core/TestSmallLayeredNeuralNetwork.java | 642 +++++++++++++++++++
.../TestSmallLayeredNeuralNetworkMessage.java | 173 +++++
.../horn/examples/MultiLayerPerceptronTest.java | 177 +++++
.../apache/horn/examples/NeuralNetworkTest.java | 212 ------
.../org/apache/horn/trainer/TestNeuron.java | 93 ---
40 files changed, 3564 insertions(+), 3812 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
deleted file mode 100644
index b18eb44..0000000
--- a/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.commons.math.DoubleDoubleFunction;
-import org.apache.hama.commons.math.DoubleFunction;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.horn.funcs.FunctionFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * AbstractLayeredNeuralNetwork defines the general operations for derivative
- * layered models, include Linear Regression, Logistic Regression, Multilayer
- * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc.
- *
- * In general, these models consist of neurons which are aligned in layers.
- * Between layers, for any two adjacent layers, the neurons are connected to
- * form a bipartite weighted graph.
- *
- */
-abstract class AbstractLayeredNeuralNetwork extends NeuralNetwork {
-
- private static final double DEFAULT_MOMENTUM_WEIGHT = 0.1;
-
- double trainingError;
-
- /* The momentumWeight */
- protected double momentumWeight;
-
- /* The cost function of the model */
- protected DoubleDoubleFunction costFunction;
-
- /* Record the size of each layer */
- protected List<Integer> layerSizeList;
-
- protected TrainingMethod trainingMethod;
-
- protected LearningStyle learningStyle;
-
- public static enum TrainingMethod {
- GRADIENT_DESCENT
- }
-
- public static enum LearningStyle {
- UNSUPERVISED,
- SUPERVISED
- }
-
- public AbstractLayeredNeuralNetwork() {
- this.momentumWeight = DEFAULT_MOMENTUM_WEIGHT;
- this.trainingMethod = TrainingMethod.GRADIENT_DESCENT;
- this.learningStyle = LearningStyle.SUPERVISED;
- }
-
- public AbstractLayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
- super(conf, modelPath);
- }
-
- public void setTrainingMethod(TrainingMethod method) {
- this.trainingMethod = method;
- }
-
- public TrainingMethod getTrainingMethod() {
- return this.trainingMethod;
- }
-
- public void setLearningStyle(LearningStyle style) {
- this.learningStyle = style;
- }
-
- public LearningStyle getLearningStyle() {
- return this.learningStyle;
- }
-
- /**
- * Set the cost function for the model.
- *
- * @param costFunction
- */
- public void setCostFunction(DoubleDoubleFunction costFunction) {
- this.costFunction = costFunction;
- }
-
- /**
- * Add a layer of neurons with specified size. If the added layer is not the
- * first layer, it will automatically connects the neurons between with the
- * previous layer.
- *
- * @param size
- * @param isFinalLayer If false, add a bias neuron.
- * @param squashingFunction The squashing function for this layer, input layer
- * is f(x) = x by default.
- * @return The layer index, starts with 0.
- */
- public abstract int addLayer(int size, boolean isFinalLayer,
- DoubleFunction squashingFunction);
-
- /**
- * Get the size of a particular layer.
- *
- * @param layer
- * @return The layer size.
- */
- public int getLayerSize(int layer) {
- Preconditions.checkArgument(
- layer >= 0 && layer < this.layerSizeList.size(),
- String.format("Input must be in range [0, %d]\n",
- this.layerSizeList.size() - 1));
- return this.layerSizeList.get(layer);
- }
-
- /**
- * Get the layer size list.
- *
- * @return The layer size list.
- */
- protected List<Integer> getLayerSizeList() {
- return this.layerSizeList;
- }
-
- /**
- * Get the weights between layer layerIdx and layerIdx + 1
- *
- * @param layerIdx The index of the layer
- * @return The weights in form of {@link DoubleMatrix}
- */
- public abstract DoubleMatrix getWeightsByLayer(int layerIdx);
-
- /**
- * Get the updated weights using one training instance.
- *
- * @param trainingInstance The trainingInstance is the concatenation of
- * feature vector and class label vector.
- * @return The update of each weight, in form of matrix list.
- * @throws Exception
- */
- public abstract DoubleMatrix[] trainByInstance(DoubleVector trainingInstance);
-
- /**
- * Get the output calculated by the model.
- *
- * @param instance The feature instance.
- * @return a new vector with the result of the operation.
- */
- public abstract DoubleVector getOutput(DoubleVector instance);
-
- /**
- * Calculate the training error based on the labels and outputs.
- *
- * @param labels
- * @param output
- */
- protected abstract void calculateTrainingError(DoubleVector labels,
- DoubleVector output);
-
- @Override
- public void readFields(DataInput input) throws IOException {
- super.readFields(input);
- // read momentum weight
- this.momentumWeight = input.readDouble();
-
- // read cost function
- this.costFunction = FunctionFactory
- .createDoubleDoubleFunction(WritableUtils.readString(input));
-
- // read layer size list
- int numLayers = input.readInt();
- this.layerSizeList = Lists.newArrayList();
- for (int i = 0; i < numLayers; ++i) {
- this.layerSizeList.add(input.readInt());
- }
-
- this.trainingMethod = WritableUtils.readEnum(input, TrainingMethod.class);
- this.learningStyle = WritableUtils.readEnum(input, LearningStyle.class);
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- super.write(output);
- // write momentum weight
- output.writeDouble(this.momentumWeight);
-
- // write cost function
- WritableUtils.writeString(output, costFunction.getFunctionName());
-
- // write layer size list
- output.writeInt(this.layerSizeList.size());
- for (Integer aLayerSizeList : this.layerSizeList) {
- output.writeInt(aLayerSizeList);
- }
-
- WritableUtils.writeEnum(output, this.trainingMethod);
- WritableUtils.writeEnum(output, this.learningStyle);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/AutoEncoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/AutoEncoder.java b/src/main/java/org/apache/horn/bsp/AutoEncoder.java
deleted file mode 100644
index 8ea2930..0000000
--- a/src/main/java/org/apache/horn/bsp/AutoEncoder.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleFunction;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.ml.util.FeatureTransformer;
-import org.apache.horn.funcs.FunctionFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- * AutoEncoder is a model used for dimensional reduction and feature learning.
- * It is a special kind of {@link NeuralNetwork} that consists of three layers
- * of neurons, where the first layer and third layer contains the same number of
- * neurons.
- *
- */
-public class AutoEncoder {
-
- private final SmallLayeredNeuralNetwork model;
-
- /**
- * Initialize the autoencoder.
- *
- * @param inputDimensions The number of dimensions for the input feature.
- * @param compressedDimensions The number of dimensions for the compressed
- * information.
- */
- public AutoEncoder(int inputDimensions, int compressedDimensions) {
- model = new SmallLayeredNeuralNetwork();
- model.addLayer(inputDimensions, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- model.addLayer(compressedDimensions, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- model.addLayer(inputDimensions, true,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- model
- .setLearningStyle(AbstractLayeredNeuralNetwork.LearningStyle.UNSUPERVISED);
- model.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("SquaredError"));
- }
-
- public AutoEncoder(HamaConfiguration conf, String modelPath) {
- model = new SmallLayeredNeuralNetwork(conf, modelPath);
- }
-
- public AutoEncoder setModelPath(String modelPath) {
- model.setModelPath(modelPath);
- return this;
- }
-
- /**
- * Train the autoencoder with given data. Note that the training data is
- * pre-processed, where the features
- *
- * @param dataInputPath
- * @param trainingParams
- * @throws InterruptedException
- * @throws IOException
- * @throws ClassNotFoundException
- */
- public BSPJob train(HamaConfiguration conf, Path dataInputPath,
- Map<String, String> trainingParams) throws ClassNotFoundException, IOException, InterruptedException {
- return model.train(conf);
- }
-
- /**
- * Train the model with one instance.
- *
- * @param trainingInstance
- */
- public void trainOnline(DoubleVector trainingInstance) {
- model.trainOnline(trainingInstance);
- }
-
- /**
- * Get the matrix M used to encode the input features.
- *
- * @return this matrix with encode the input.
- */
- public DoubleMatrix getEncodeWeightMatrix() {
- return model.getWeightsByLayer(0);
- }
-
- /**
- * Get the matrix M used to decode the compressed information.
- *
- * @return this matrix with decode the compressed information.
- */
- public DoubleMatrix getDecodeWeightMatrix() {
- return model.getWeightsByLayer(1);
- }
-
- /**
- * Transform the input features.
- *
- * @param inputInstance
- * @return The compressed information.
- */
- private DoubleVector transform(DoubleVector inputInstance, int inputLayer) {
- DoubleVector internalInstance = new DenseDoubleVector(
- inputInstance.getDimension() + 1);
- internalInstance.set(0, 1);
- for (int i = 0; i < inputInstance.getDimension(); ++i) {
- internalInstance.set(i + 1, inputInstance.get(i));
- }
- DoubleFunction squashingFunction = model.getSquashingFunction(inputLayer);
- DoubleMatrix weightMatrix = null;
- if (inputLayer == 0) {
- weightMatrix = this.getEncodeWeightMatrix();
- } else {
- weightMatrix = this.getDecodeWeightMatrix();
- }
- DoubleVector vec = weightMatrix.multiplyVectorUnsafe(internalInstance);
- vec = vec.applyToElements(squashingFunction);
- return vec;
- }
-
- /**
- * Encode the input instance.
- *
- * @param inputInstance
- * @return a new vector with the encode input instance.
- */
- public DoubleVector encode(DoubleVector inputInstance) {
- Preconditions
- .checkArgument(
- inputInstance.getDimension() == model.getLayerSize(0) - 1,
- String
- .format(
- "The dimension of input instance is %d, but the model requires dimension %d.",
- inputInstance.getDimension(), model.getLayerSize(1) - 1));
- return this.transform(inputInstance, 0);
- }
-
- /**
- * Decode the input instance.
- *
- * @param inputInstance
- * @return a new vector with the decode input instance.
- */
- public DoubleVector decode(DoubleVector inputInstance) {
- Preconditions
- .checkArgument(
- inputInstance.getDimension() == model.getLayerSize(1) - 1,
- String
- .format(
- "The dimension of input instance is %d, but the model requires dimension %d.",
- inputInstance.getDimension(), model.getLayerSize(1) - 1));
- return this.transform(inputInstance, 1);
- }
-
- /**
- * Get the label(s) according to the given features.
- *
- * @param inputInstance
- * @return a new vector with output of the model according to given feature
- * instance.
- */
- public DoubleVector getOutput(DoubleVector inputInstance) {
- return model.getOutput(inputInstance);
- }
-
- /**
- * Set the feature transformer.
- *
- * @param featureTransformer
- */
- public void setFeatureTransformer(FeatureTransformer featureTransformer) {
- this.model.setFeatureTransformer(featureTransformer);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/HornJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/HornJob.java b/src/main/java/org/apache/horn/bsp/HornJob.java
deleted file mode 100644
index 4521b87..0000000
--- a/src/main/java/org/apache/horn/bsp/HornJob.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.commons.math.Function;
-import org.apache.horn.funcs.FunctionFactory;
-
-public class HornJob extends BSPJob {
-
- SmallLayeredNeuralNetwork neuralNetwork;
-
- public HornJob(HamaConfiguration conf, Class<?> exampleClass)
- throws IOException {
- super(conf);
- this.setJarByClass(exampleClass);
-
- neuralNetwork = new SmallLayeredNeuralNetwork();
- }
-
- public void inputLayer(int featureDimension, Class<? extends Function> func) {
- addLayer(featureDimension, func);
- }
-
- public void addLayer(int featureDimension, Class<? extends Function> func) {
- neuralNetwork.addLayer(featureDimension, false,
- FunctionFactory.createDoubleFunction(func.getSimpleName()));
- }
-
- public void outputLayer(int labels, Class<? extends Function> func) {
- neuralNetwork.addLayer(labels, true,
- FunctionFactory.createDoubleFunction(func.getSimpleName()));
- }
-
- public void setCostFunction(Class<? extends Function> func) {
- neuralNetwork.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction(func.getSimpleName()));
- }
-
- public void setDouble(String name, double value) {
- conf.setDouble(name, value);
- }
-
- public void setMaxIteration(int maxIteration) {
- this.conf.setInt("training.max.iterations", maxIteration);
- }
-
- public void setBatchSize(int batchSize) {
- this.conf.setInt("training.batch.size", batchSize);
- }
-
- public void setLearningRate(double learningRate) {
- this.conf.setDouble("mlp.learning.rate", learningRate);
- }
-
- public void setConvergenceCheckInterval(int n) {
- this.conf.setInt("convergence.check.interval", n);
- }
-
- public void setMomentumWeight(double momentumWeight) {
- this.conf.setDouble("mlp.momentum.weight", momentumWeight);
- }
-
- public SmallLayeredNeuralNetwork getNeuralNetwork() {
- return neuralNetwork;
- }
-
- public boolean waitForCompletion(boolean verbose) throws IOException,
- InterruptedException, ClassNotFoundException {
- BSPJob job = neuralNetwork.train(this.conf);
- if (verbose) {
- return job.waitForCompletion(true);
- } else {
- return job.waitForCompletion(false);
- }
- }
-
- public void setRegularizationWeight(double regularizationWeight) {
- this.conf.setDouble("regularization.weight", regularizationWeight);
- }
-
- public void setModelPath(String modelPath) {
- this.conf.set("model.path", modelPath);
- neuralNetwork.setModelPath(modelPath);
- }
-
- public void setTrainingSetPath(String inputPath) {
- this.conf.set("training.input.path", inputPath);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/NeuralNetwork.java b/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
deleted file mode 100644
index 051881d..0000000
--- a/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.ml.util.DefaultFeatureTransformer;
-import org.apache.hama.ml.util.FeatureTransformer;
-
-import com.google.common.base.Preconditions;
-import com.google.common.io.Closeables;
-
-/**
- * NeuralNetwork defines the general operations for all the derivative models.
- * Typically, all derivative models such as Linear Regression, Logistic
- * Regression, and Multilayer Perceptron consist of neurons and the weights
- * between neurons.
- *
- */
-abstract class NeuralNetwork implements Writable {
- protected HamaConfiguration conf;
- protected FileSystem fs;
-
- private static final double DEFAULT_LEARNING_RATE = 0.5;
-
- protected double learningRate;
- protected boolean learningRateDecay = false;
-
- // the name of the model
- protected String modelType;
- // the path to store the model
- protected String modelPath;
-
- protected FeatureTransformer featureTransformer;
-
- public NeuralNetwork() {
- this.learningRate = DEFAULT_LEARNING_RATE;
- this.modelType = this.getClass().getSimpleName();
- this.featureTransformer = new DefaultFeatureTransformer();
- }
-
- public NeuralNetwork(String modelPath) {
- this.modelPath = modelPath;
- }
-
- public NeuralNetwork(HamaConfiguration conf, String modelPath) {
- try {
- this.conf = conf;
- this.fs = FileSystem.get(conf);
- this.modelPath = modelPath;
-
- this.readFromModel();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
-
- public void isLearningRateDecay(boolean decay) {
- this.learningRateDecay = decay;
- }
-
- public String getModelType() {
- return this.modelType;
- }
-
- /**
- * Train the model with the path of given training data and parameters.
- *
- * @param dataInputPath The path of the training data.
- * @param trainingParams The parameters for training.
- * @throws InterruptedException
- * @throws ClassNotFoundException
- * @throws IOException
- */
- public BSPJob train(Configuration conf) throws ClassNotFoundException, IOException, InterruptedException {
- Preconditions.checkArgument(this.modelPath != null,
- "Please set the model path before training.");
-
- // train with BSP job
- return trainInternal((HamaConfiguration) conf);
- }
-
- /**
- * Train the model with the path of given training data and parameters.
- */
- protected abstract BSPJob trainInternal(HamaConfiguration hamaConf)
- throws IOException, InterruptedException, ClassNotFoundException;
-
- /**
- * Read the model meta-data from the specified location.
- *
- * @throws IOException
- */
- protected void readFromModel() throws IOException {
- Preconditions.checkArgument(this.modelPath != null,
- "Model path has not been set.");
- FSDataInputStream is = new FSDataInputStream(fs.open(new Path(modelPath)));
- this.readFields(is);
- Closeables.close(is, false);
- }
-
- /**
- * Write the model data to specified location.
- *
- * @throws IOException
- */
- public void writeModelToFile() throws IOException {
- Preconditions.checkArgument(this.modelPath != null,
- "Model path has not been set.");
-
- FSDataOutputStream is = fs.create(new Path(this.modelPath), true);
- this.write(is);
-
- Closeables.close(is, false);
- }
-
- /**
- * Set the model path.
- *
- * @param modelPath
- */
- public void setModelPath(String modelPath) {
- this.modelPath = modelPath;
- }
-
- /**
- * Get the model path.
- *
- * @return the path to store the model.
- */
- public String getModelPath() {
- return this.modelPath;
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public void readFields(DataInput input) throws IOException {
- // read model type
- this.modelType = WritableUtils.readString(input);
- // read learning rate
- this.learningRate = input.readDouble();
- // read model path
- this.modelPath = WritableUtils.readString(input);
-
- if (this.modelPath.equals("null")) {
- this.modelPath = null;
- }
-
- // read feature transformer
- int bytesLen = input.readInt();
- byte[] featureTransformerBytes = new byte[bytesLen];
- for (int i = 0; i < featureTransformerBytes.length; ++i) {
- featureTransformerBytes[i] = input.readByte();
- }
-
- Class<? extends FeatureTransformer> featureTransformerCls = (Class<? extends FeatureTransformer>) SerializationUtils
- .deserialize(featureTransformerBytes);
-
- Constructor[] constructors = featureTransformerCls
- .getDeclaredConstructors();
- Constructor constructor = constructors[0];
-
- try {
- this.featureTransformer = (FeatureTransformer) constructor
- .newInstance(new Object[] {});
- } catch (InstantiationException e) {
- e.printStackTrace();
- } catch (IllegalAccessException e) {
- e.printStackTrace();
- } catch (IllegalArgumentException e) {
- e.printStackTrace();
- } catch (InvocationTargetException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- // write model type
- WritableUtils.writeString(output, modelType);
- // write learning rate
- output.writeDouble(learningRate);
- // write model path
- if (this.modelPath != null) {
- WritableUtils.writeString(output, modelPath);
- } else {
- WritableUtils.writeString(output, "null");
- }
-
- // serialize the class
- Class<? extends FeatureTransformer> featureTransformerCls = this.featureTransformer
- .getClass();
- byte[] featureTransformerBytes = SerializationUtils
- .serialize(featureTransformerCls);
- output.writeInt(featureTransformerBytes.length);
- output.write(featureTransformerBytes);
- }
-
- public void setFeatureTransformer(FeatureTransformer featureTransformer) {
- this.featureTransformer = featureTransformer;
- }
-
- public FeatureTransformer getFeatureTransformer() {
- return this.featureTransformer;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java b/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java
deleted file mode 100644
index 648e86b..0000000
--- a/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.ml.util.DefaultFeatureTransformer;
-import org.apache.hama.ml.util.FeatureTransformer;
-
-/**
- * The trainer that is used to train the {@link SmallLayeredNeuralNetwork} with
- * BSP. The trainer would read the training data and obtain the trained
- * parameters of the model.
- *
- */
-public abstract class NeuralNetworkTrainer extends
- BSP<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> {
-
- protected static final Log LOG = LogFactory
- .getLog(NeuralNetworkTrainer.class);
-
- protected Configuration conf;
- protected int maxIteration;
- protected int batchSize;
- protected String trainingMode;
-
- protected FeatureTransformer featureTransformer;
-
- @Override
- final public void setup(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
- throws IOException, SyncException, InterruptedException {
- conf = peer.getConfiguration();
- featureTransformer = new DefaultFeatureTransformer();
- this.extraSetup(peer);
- }
-
- /**
- * Handle extra setup for sub-classes.
- *
- * @param peer
- * @throws IOException
- * @throws SyncException
- * @throws InterruptedException
- */
- protected void extraSetup(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
- throws IOException, SyncException, InterruptedException {
-
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public abstract void bsp(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
- throws IOException, SyncException, InterruptedException;
-
- @Override
- public void cleanup(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
- throws IOException {
- this.extraCleanup(peer);
- // write model to modelPath
- }
-
- /**
- * Handle cleanup for sub-classes. Write the trained model back.
- *
- * @param peer
- * @throws IOException
- * @throws SyncException
- * @throws InterruptedException
- */
- protected void extraCleanup(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
- throws IOException {
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/Neuron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/Neuron.java b/src/main/java/org/apache/horn/bsp/Neuron.java
deleted file mode 100644
index f122b6d..0000000
--- a/src/main/java/org/apache/horn/bsp/Neuron.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.commons.math.DoubleFunction;
-
-public abstract class Neuron<M extends Writable> implements NeuronInterface<M> {
- double output;
- double weight;
- double delta;
- protected DoubleFunction squashingFunction;
-
- public void feedforward(double sum) {
- // TODO Auto-generated method stub
- // squashing
- this.output = sum;
- }
-
- public void backpropagate(double gradient) {
- // TODO Auto-generated method stub
- this.delta = gradient;
- }
-
- public double getDelta() {
- return delta;
- }
-
- public void setWeight(double weight) {
- this.weight = weight;
- }
-
- public void setOutput(double output) {
- this.output = output;
- }
-
- public double getOutput() {
- return output;
- }
-
- // ////////* Below methods will communicate with parameter server */
- private int i;
-
- public void push(double weight) {
- weights[i++] = weight;
- }
-
- public double getUpdate() {
- return weight;
- }
-
- double[] weights;
-
- public void setWeightVector(int rowCount) {
- i = 0;
- weights = new double[rowCount];
- }
-
- public double[] getWeights() {
- return weights;
- }
-
- public void setSquashingFunction(DoubleFunction squashingFunction) {
- this.squashingFunction = squashingFunction;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/NeuronInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/NeuronInterface.java b/src/main/java/org/apache/horn/bsp/NeuronInterface.java
deleted file mode 100644
index bcc1a5a..0000000
--- a/src/main/java/org/apache/horn/bsp/NeuronInterface.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.HamaConfiguration;
-
-public interface NeuronInterface<M extends Writable> {
-
- public void setup(HamaConfiguration conf);
-
- /**
- * This method is called when the messages are propagated from the lower
- * layer. It can be used to determine if the neuron would activate, or fire.
- *
- * @param messages
- * @throws IOException
- */
- public void forward(Iterable<M> messages) throws IOException;
-
- /**
- * This method is called when the errors are propagated from the upper layer.
- * It can be used to calculate the error of each neuron and change the
- * weights.
- *
- * @param messages
- * @throws IOException
- */
- public void backward(Iterable<M> messages) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/ParameterMerger.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/ParameterMerger.java b/src/main/java/org/apache/horn/bsp/ParameterMerger.java
deleted file mode 100644
index 6df719a..0000000
--- a/src/main/java/org/apache/horn/bsp/ParameterMerger.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import org.apache.hama.ipc.VersionedProtocol;
-
-public interface ParameterMerger extends VersionedProtocol {
- long versionID = 1L;
-
- SmallLayeredNeuralNetworkMessage merge(SmallLayeredNeuralNetworkMessage msg);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java b/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java
deleted file mode 100644
index 47aab84..0000000
--- a/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hama.commons.math.DoubleMatrix;
-
-import com.google.common.base.Preconditions;
-
-public class ParameterMergerServer implements ParameterMerger {
-
- private static final Log LOG = LogFactory.getLog(ParameterMergerServer.class);
-
- /* The parameter merge base. */
- protected SmallLayeredNeuralNetwork inMemoryModel;
-
- /* To terminate or not to terminate. */
- protected AtomicBoolean isConverge;
-
- /* The number of slave works that request commits. */
- protected int SlaveCount;
-
- /* After mergeLimit, terminate whether the result is converging or not. */
- protected int mergeLimit;
-
- /*
- * last n training errors. converging is decided based on the average value of
- * these errors.
- */
- protected double[] trainingErrors;
-
- /*
- * If the average of last n training errors is smaller than this value, it is
- * converging.
- */
- protected double prevAvgTrainingError = Double.MAX_VALUE;
-
- /* current index for trainingErrors. */
- protected int curTrainingError = 0;
-
- /* how many merges have been conducted? */
- protected int mergeCount = 0;
-
- public ParameterMergerServer(SmallLayeredNeuralNetwork inMemoryModel,
- AtomicBoolean isConverge, int slaveCount, int mergeLimit,
- int convergenceCheckInterval) {
- this.inMemoryModel = inMemoryModel;
- this.isConverge = isConverge;
- this.SlaveCount = slaveCount;
- this.mergeLimit = mergeLimit;
- this.trainingErrors = new double[convergenceCheckInterval];
- }
-
- @Override
- public long getProtocolVersion(String s, long l) throws IOException {
- return ParameterMerger.versionID;
- }
-
- @Override
- public SmallLayeredNeuralNetworkMessage merge(
- SmallLayeredNeuralNetworkMessage msg) {
-
- double trainingError = msg.getTrainingError();
- DoubleMatrix[] weightUpdates = msg.getCurMatrices();
- DoubleMatrix[] prevWeightUpdates = msg.getPrevMatrices();
-
- Preconditions
- .checkArgument(weightUpdates.length == prevWeightUpdates.length);
-
- LOG.info("Start merging: " + this.mergeCount);
-
- if (!this.isConverge.get()) {
- for (int i = 0; i < weightUpdates.length; ++i) {
- weightUpdates[i] = weightUpdates[i].divide(this.SlaveCount);
- prevWeightUpdates[i] = prevWeightUpdates[i].divide(this.SlaveCount);
- }
-
- synchronized (inMemoryModel) {
- this.inMemoryModel.updateWeightMatrices(weightUpdates);
- this.inMemoryModel.setPrevWeightMatrices(prevWeightUpdates);
-
- // add trainingError to trainingErrors
- this.trainingErrors[this.curTrainingError++] = trainingError;
-
- // check convergence
- if (this.trainingErrors.length == this.curTrainingError) {
- double curAvgTrainingError = 0.0;
- for (int i = 0; i < this.curTrainingError; ++i) {
- curAvgTrainingError += this.trainingErrors[i];
- }
- curAvgTrainingError /= this.trainingErrors.length;
-
- if (prevAvgTrainingError < curAvgTrainingError) {
- this.isConverge.set(true);
- } else {
- // update
- prevAvgTrainingError = curAvgTrainingError;
- this.curTrainingError = 0;
- }
- }
-
- if (++this.mergeCount == this.mergeLimit) {
- this.isConverge.set(true);
- }
- }
- }
-
- return new SmallLayeredNeuralNetworkMessage(0, this.isConverge.get(),
- this.inMemoryModel.getWeightMatrices(),
- this.inMemoryModel.getPrevMatricesUpdates());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
deleted file mode 100644
index 0ea8e51..0000000
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
+++ /dev/null
@@ -1,620 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.math.RandomUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.commons.io.MatrixWritable;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleFunction;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.util.ReflectionUtils;
-import org.apache.horn.examples.MultiLayerPerceptron.StandardNeuron;
-import org.apache.horn.funcs.FunctionFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * SmallLayeredNeuralNetwork defines the general operations for derivative
- * layered models, include Linear Regression, Logistic Regression, Multilayer
- * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc. For
- * SmallLayeredNeuralNetwork, the training can be conducted in parallel, but the
- * parameters of the models are assumes to be stored in a single machine.
- *
- * In general, these models consist of neurons which are aligned in layers.
- * Between layers, for any two adjacent layers, the neurons are connected to
- * form a bipartite weighted graph.
- *
- */
-public class SmallLayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
-
- private static final Log LOG = LogFactory
- .getLog(SmallLayeredNeuralNetwork.class);
-
- public static Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>> neuronClass;
-
- /* Weights between neurons at adjacent layers */
- protected List<DoubleMatrix> weightMatrixList;
-
- /* Previous weight updates between neurons at adjacent layers */
- protected List<DoubleMatrix> prevWeightUpdatesList;
-
- /* Different layers can have different squashing function */
- protected List<DoubleFunction> squashingFunctionList;
-
- protected int finalLayerIdx;
-
- protected double regularizationWeight;
-
- public SmallLayeredNeuralNetwork() {
- this.layerSizeList = Lists.newArrayList();
- this.weightMatrixList = Lists.newArrayList();
- this.prevWeightUpdatesList = Lists.newArrayList();
- this.squashingFunctionList = Lists.newArrayList();
- }
-
- public SmallLayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
- super(conf, modelPath);
- this.regularizationWeight = conf.getDouble("regularization.weight", 0);
- }
-
- @Override
- /**
- * {@inheritDoc}
- */
- public int addLayer(int size, boolean isFinalLayer,
- DoubleFunction squashingFunction) {
- Preconditions.checkArgument(size > 0,
- "Size of layer must be larger than 0.");
- if (!isFinalLayer) {
- size += 1;
- }
-
- LOG.info("Add Layer: " + size);
- this.layerSizeList.add(size);
- int layerIdx = this.layerSizeList.size() - 1;
- if (isFinalLayer) {
- this.finalLayerIdx = layerIdx;
- }
-
- // add weights between current layer and previous layer, and input layer has
- // no squashing function
- if (layerIdx > 0) {
- int sizePrevLayer = this.layerSizeList.get(layerIdx - 1);
- // row count equals to size of current size and column count equals to
- // size of previous layer
- int row = isFinalLayer ? size : size - 1;
- int col = sizePrevLayer;
- DoubleMatrix weightMatrix = new DenseDoubleMatrix(row, col);
- // initialize weights
- weightMatrix.applyToElements(new DoubleFunction() {
- @Override
- public double apply(double value) {
- return RandomUtils.nextDouble() - 0.5;
- }
-
- @Override
- public double applyDerivative(double value) {
- throw new UnsupportedOperationException("");
- }
- });
- this.weightMatrixList.add(weightMatrix);
- this.prevWeightUpdatesList.add(new DenseDoubleMatrix(row, col));
- this.squashingFunctionList.add(squashingFunction);
- }
- return layerIdx;
- }
-
- /**
- * Update the weight matrices with given matrices.
- *
- * @param matrices
- */
- public void updateWeightMatrices(DoubleMatrix[] matrices) {
- for (int i = 0; i < matrices.length; ++i) {
- DoubleMatrix matrix = this.weightMatrixList.get(i);
- this.weightMatrixList.set(i, matrix.add(matrices[i]));
- }
- }
-
- /**
- * Set the previous weight matrices.
- *
- * @param prevUpdates
- */
- void setPrevWeightMatrices(DoubleMatrix[] prevUpdates) {
- this.prevWeightUpdatesList.clear();
- Collections.addAll(this.prevWeightUpdatesList, prevUpdates);
- }
-
- /**
- * Add a batch of matrices onto the given destination matrices.
- *
- * @param destMatrices
- * @param sourceMatrices
- */
- static void matricesAdd(DoubleMatrix[] destMatrices,
- DoubleMatrix[] sourceMatrices) {
- for (int i = 0; i < destMatrices.length; ++i) {
- destMatrices[i] = destMatrices[i].add(sourceMatrices[i]);
- }
- }
-
- /**
- * Get all the weight matrices.
- *
- * @return The matrices in form of matrix array.
- */
- DoubleMatrix[] getWeightMatrices() {
- DoubleMatrix[] matrices = new DoubleMatrix[this.weightMatrixList.size()];
- this.weightMatrixList.toArray(matrices);
- return matrices;
- }
-
- /**
- * Set the weight matrices.
- *
- * @param matrices
- */
- public void setWeightMatrices(DoubleMatrix[] matrices) {
- this.weightMatrixList = new ArrayList<DoubleMatrix>();
- Collections.addAll(this.weightMatrixList, matrices);
- }
-
- /**
- * Get the previous matrices updates in form of array.
- *
- * @return The matrices in form of matrix array.
- */
- public DoubleMatrix[] getPrevMatricesUpdates() {
- DoubleMatrix[] prevMatricesUpdates = new DoubleMatrix[this.prevWeightUpdatesList
- .size()];
- for (int i = 0; i < this.prevWeightUpdatesList.size(); ++i) {
- prevMatricesUpdates[i] = this.prevWeightUpdatesList.get(i);
- }
- return prevMatricesUpdates;
- }
-
- public void setWeightMatrix(int index, DoubleMatrix matrix) {
- Preconditions.checkArgument(
- 0 <= index && index < this.weightMatrixList.size(), String.format(
- "index [%d] should be in range[%d, %d].", index, 0,
- this.weightMatrixList.size()));
- this.weightMatrixList.set(index, matrix);
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- super.readFields(input);
-
- // read squash functions
- int squashingFunctionSize = input.readInt();
- this.squashingFunctionList = Lists.newArrayList();
- for (int i = 0; i < squashingFunctionSize; ++i) {
- this.squashingFunctionList.add(FunctionFactory
- .createDoubleFunction(WritableUtils.readString(input)));
- }
-
- // read weights and construct matrices of previous updates
- int numOfMatrices = input.readInt();
- this.weightMatrixList = Lists.newArrayList();
- this.prevWeightUpdatesList = Lists.newArrayList();
- for (int i = 0; i < numOfMatrices; ++i) {
- DoubleMatrix matrix = MatrixWritable.read(input);
- this.weightMatrixList.add(matrix);
- this.prevWeightUpdatesList.add(new DenseDoubleMatrix(
- matrix.getRowCount(), matrix.getColumnCount()));
- }
-
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- super.write(output);
-
- // write squashing functions
- output.writeInt(this.squashingFunctionList.size());
- for (DoubleFunction aSquashingFunctionList : this.squashingFunctionList) {
- WritableUtils.writeString(output,
- aSquashingFunctionList.getFunctionName());
- }
-
- // write weight matrices
- output.writeInt(this.weightMatrixList.size());
- for (DoubleMatrix aWeightMatrixList : this.weightMatrixList) {
- MatrixWritable.write(aWeightMatrixList, output);
- }
-
- // DO NOT WRITE WEIGHT UPDATE
- }
-
- @Override
- public DoubleMatrix getWeightsByLayer(int layerIdx) {
- return this.weightMatrixList.get(layerIdx);
- }
-
- /**
- * Get the output of the model according to given feature instance.
- */
- @Override
- public DoubleVector getOutput(DoubleVector instance) {
- Preconditions.checkArgument(this.layerSizeList.get(0) - 1 == instance
- .getDimension(), String.format(
- "The dimension of input instance should be %d.",
- this.layerSizeList.get(0) - 1));
- // transform the features to another space
- DoubleVector transformedInstance = this.featureTransformer
- .transform(instance);
- // add bias feature
- DoubleVector instanceWithBias = new DenseDoubleVector(
- transformedInstance.getDimension() + 1);
- instanceWithBias.set(0, 0.99999); // set bias to be a little bit less than
- // 1.0
- for (int i = 1; i < instanceWithBias.getDimension(); ++i) {
- instanceWithBias.set(i, transformedInstance.get(i - 1));
- }
-
- List<DoubleVector> outputCache = getOutputInternal(instanceWithBias);
- // return the output of the last layer
- DoubleVector result = outputCache.get(outputCache.size() - 1);
- // remove bias
- return result.sliceUnsafe(1, result.getDimension() - 1);
- }
-
- /**
- * Calculate output internally, the intermediate output of each layer will be
- * stored.
- *
- * @param instanceWithBias The instance contains the features.
- * @return Cached output of each layer.
- */
- public List<DoubleVector> getOutputInternal(DoubleVector instanceWithBias) {
- List<DoubleVector> outputCache = new ArrayList<DoubleVector>();
- // fill with instance
- DoubleVector intermediateOutput = instanceWithBias;
- outputCache.add(intermediateOutput);
-
- for (int i = 0; i < this.layerSizeList.size() - 1; ++i) {
- intermediateOutput = forward(i, intermediateOutput);
- outputCache.add(intermediateOutput);
- }
-
- return outputCache;
- }
-
- /**
- * @return a new neuron instance
- */
- public static Neuron<Synapse<DoubleWritable, DoubleWritable>> newNeuronInstance() {
- return (Neuron<Synapse<DoubleWritable, DoubleWritable>>) ReflectionUtils
- .newInstance(neuronClass);
- }
-
- /**
- * Forward the calculation for one layer.
- *
- * @param fromLayer The index of the previous layer.
- * @param intermediateOutput The intermediateOutput of previous layer.
- * @return a new vector with the result of the operation.
- */
- protected DoubleVector forward(int fromLayer, DoubleVector intermediateOutput) {
- DoubleMatrix weightMatrix = this.weightMatrixList.get(fromLayer);
-
- neuronClass = (Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>>) conf
- .getClass("neuron.class", Neuron.class);
-
- // TODO use the multithread processing
- DoubleVector vec = new DenseDoubleVector(weightMatrix.getRowCount());
- for (int row = 0; row < weightMatrix.getRowCount(); row++) {
- List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
- for (int col = 0; col < weightMatrix.getColumnCount(); col++) {
- msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
- new DoubleWritable(intermediateOutput.get(col)),
- new DoubleWritable(weightMatrix.get(row, col))));
- }
- Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
- Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
- n.setup(conf);
- n.setSquashingFunction(this.squashingFunctionList.get(fromLayer));
- try {
- n.forward(iterable);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- vec.set(row, n.getOutput());
- }
-
- // add bias
- DoubleVector vecWithBias = new DenseDoubleVector(vec.getDimension() + 1);
- vecWithBias.set(0, 1);
- for (int i = 0; i < vec.getDimension(); ++i) {
- vecWithBias.set(i + 1, vec.get(i));
- }
-
- return vecWithBias;
- }
-
- /**
- * Train the model online.
- *
- * @param trainingInstance
- */
- public void trainOnline(DoubleVector trainingInstance) {
- DoubleMatrix[] updateMatrices = this.trainByInstance(trainingInstance);
- this.updateWeightMatrices(updateMatrices);
- }
-
- @Override
- public DoubleMatrix[] trainByInstance(DoubleVector trainingInstance) {
- DoubleVector transformedVector = this.featureTransformer
- .transform(trainingInstance.sliceUnsafe(this.layerSizeList.get(0) - 1));
-
- int inputDimension = this.layerSizeList.get(0) - 1;
- int outputDimension;
- DoubleVector inputInstance = null;
- DoubleVector labels = null;
- if (this.learningStyle == LearningStyle.SUPERVISED) {
- outputDimension = this.layerSizeList.get(this.layerSizeList.size() - 1);
- // validate training instance
- Preconditions.checkArgument(
- inputDimension + outputDimension == trainingInstance.getDimension(),
- String
- .format(
- "The dimension of training instance is %d, but requires %d.",
- trainingInstance.getDimension(), inputDimension
- + outputDimension));
-
- inputInstance = new DenseDoubleVector(this.layerSizeList.get(0));
- inputInstance.set(0, 1); // add bias
- // get the features from the transformed vector
- for (int i = 0; i < inputDimension; ++i) {
- inputInstance.set(i + 1, transformedVector.get(i));
- }
- // get the labels from the original training instance
- labels = trainingInstance.sliceUnsafe(inputInstance.getDimension() - 1,
- trainingInstance.getDimension() - 1);
- } else if (this.learningStyle == LearningStyle.UNSUPERVISED) {
- // labels are identical to input features
- outputDimension = inputDimension;
- // validate training instance
- Preconditions.checkArgument(inputDimension == trainingInstance
- .getDimension(), String.format(
- "The dimension of training instance is %d, but requires %d.",
- trainingInstance.getDimension(), inputDimension));
-
- inputInstance = new DenseDoubleVector(this.layerSizeList.get(0));
- inputInstance.set(0, 1); // add bias
- // get the features from the transformed vector
- for (int i = 0; i < inputDimension; ++i) {
- inputInstance.set(i + 1, transformedVector.get(i));
- }
- // get the labels by copying the transformed vector
- labels = transformedVector.deepCopy();
- }
-
- List<DoubleVector> internalResults = this.getOutputInternal(inputInstance);
- DoubleVector output = internalResults.get(internalResults.size() - 1);
-
- // get the training error
- calculateTrainingError(labels,
- output.deepCopy().sliceUnsafe(1, output.getDimension() - 1));
-
- if (this.trainingMethod.equals(TrainingMethod.GRADIENT_DESCENT)) {
- return this.trainByInstanceGradientDescent(labels, internalResults);
- } else {
- throw new IllegalArgumentException(
- String.format("Training method is not supported."));
- }
- }
-
- /**
- * Train by gradient descent. Get the updated weights using one training
- * instance.
- *
- * @param trainingInstance
- * @return The weight update matrices.
- */
- private DoubleMatrix[] trainByInstanceGradientDescent(DoubleVector labels,
- List<DoubleVector> internalResults) {
-
- DoubleVector output = internalResults.get(internalResults.size() - 1);
- // initialize weight update matrices
- DenseDoubleMatrix[] weightUpdateMatrices = new DenseDoubleMatrix[this.weightMatrixList
- .size()];
- for (int m = 0; m < weightUpdateMatrices.length; ++m) {
- weightUpdateMatrices[m] = new DenseDoubleMatrix(this.weightMatrixList
- .get(m).getRowCount(), this.weightMatrixList.get(m).getColumnCount());
- }
- DoubleVector deltaVec = new DenseDoubleVector(
- this.layerSizeList.get(this.layerSizeList.size() - 1));
-
- DoubleFunction squashingFunction = this.squashingFunctionList
- .get(this.squashingFunctionList.size() - 1);
-
- DoubleMatrix lastWeightMatrix = this.weightMatrixList
- .get(this.weightMatrixList.size() - 1);
- for (int i = 0; i < deltaVec.getDimension(); ++i) {
- double costFuncDerivative = this.costFunction.applyDerivative(
- labels.get(i), output.get(i + 1));
- // add regularization
- costFuncDerivative += this.regularizationWeight
- * lastWeightMatrix.getRowVector(i).sum();
- deltaVec.set(
- i,
- costFuncDerivative
- * squashingFunction.applyDerivative(output.get(i + 1)));
- }
-
- // start from previous layer of output layer
- for (int layer = this.layerSizeList.size() - 2; layer >= 0; --layer) {
- output = internalResults.get(layer);
- deltaVec = backpropagate(layer, deltaVec, internalResults,
- weightUpdateMatrices[layer]);
- }
-
- this.setPrevWeightMatrices(weightUpdateMatrices);
-
- return weightUpdateMatrices;
- }
-
- /**
- * Back-propagate the errors to from next layer to current layer. The weight
- * updated information will be stored in the weightUpdateMatrices, and the
- * delta of the prevLayer would be returned.
- *
- * @param layer Index of current layer.
- * @param internalOutput Internal output of current layer.
- * @param deltaVec Delta of next layer.
- * @return the squashing function of the specified position.
- */
- private DoubleVector backpropagate(int curLayerIdx,
- DoubleVector nextLayerDelta, List<DoubleVector> outputCache,
- DenseDoubleMatrix weightUpdateMatrix) {
-
- // get layer related information
- DoubleVector curLayerOutput = outputCache.get(curLayerIdx);
- DoubleMatrix weightMatrix = this.weightMatrixList.get(curLayerIdx);
- DoubleMatrix prevWeightMatrix = this.prevWeightUpdatesList.get(curLayerIdx);
-
- // next layer is not output layer, remove the delta of bias neuron
- if (curLayerIdx != this.layerSizeList.size() - 2) {
- nextLayerDelta = nextLayerDelta.slice(1,
- nextLayerDelta.getDimension() - 1);
- }
-
- // DoubleMatrix transposed = weightMatrix.transpose();
- DoubleVector deltaVector = new DenseDoubleVector(
- weightMatrix.getColumnCount());
- for (int row = 0; row < weightMatrix.getColumnCount(); ++row) {
- Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
- // calls setup method
- n.setup(conf);
- n.setSquashingFunction(this.squashingFunctionList.get(curLayerIdx));
- n.setOutput(curLayerOutput.get(row));
-
- List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
-
- n.setWeightVector(weightMatrix.getRowCount());
-
- for (int col = 0; col < weightMatrix.getRowCount(); ++col) {
- // sum += (transposed.get(row, col) * nextLayerDelta.get(col));
- msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
- new DoubleWritable(nextLayerDelta.get(col)), new DoubleWritable(
- weightMatrix.get(col, row)), new DoubleWritable(
- prevWeightMatrix.get(col, row))));
- }
-
- Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
- try {
- n.backward(iterable);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- // update weights
- weightUpdateMatrix.setColumn(row, n.getWeights());
- deltaVector.set(row, n.getDelta());
- }
-
- return deltaVector;
- }
-
- @Override
- protected BSPJob trainInternal(HamaConfiguration hamaConf)
- throws IOException, InterruptedException, ClassNotFoundException {
- this.conf = hamaConf;
- this.fs = FileSystem.get(conf);
-
- String modelPath = conf.get("model.path");
- if (modelPath != null) {
- this.modelPath = modelPath;
- }
- // modelPath must be set before training
- if (this.modelPath == null) {
- throw new IllegalArgumentException(
- "Please specify the modelPath for model, "
- + "either through setModelPath() or add 'modelPath' to the training parameters.");
- }
- this.writeModelToFile();
-
- // create job
- BSPJob job = new BSPJob(conf, SmallLayeredNeuralNetworkTrainer.class);
- job.setJobName("Small scale Neural Network training");
- job.setJarByClass(SmallLayeredNeuralNetworkTrainer.class);
- job.setBspClass(SmallLayeredNeuralNetworkTrainer.class);
-
- job.getConfiguration().setClass("neuron.class", StandardNeuron.class,
- Neuron.class);
-
- // additional for parameter server
- // TODO at this moment, we use 1 task as a parameter server
- // In the future, the number of parameter server should be configurable
- job.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
-
- job.setInputPath(new Path(conf.get("training.input.path")));
- job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
- job.setInputKeyClass(LongWritable.class);
- job.setInputValueClass(VectorWritable.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(NullWritable.class);
- job.setOutputFormat(org.apache.hama.bsp.NullOutputFormat.class);
-
- return job;
- }
-
- @Override
- protected void calculateTrainingError(DoubleVector labels, DoubleVector output) {
- DoubleVector errors = labels.deepCopy().applyToElements(output,
- this.costFunction);
- this.trainingError = errors.sum();
- }
-
- /**
- * Get the squashing function of a specified layer.
- *
- * @param idx
- * @return a new vector with the result of the operation.
- */
- public DoubleFunction getSquashingFunction(int idx) {
- return this.squashingFunctionList.get(idx);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java
deleted file mode 100644
index 2f8c287..0000000
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.commons.io.MatrixWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DoubleMatrix;
-
-/**
- * NeuralNetworkMessage transmits the messages between peers during the training
- * of neural networks.
- *
- */
-public class SmallLayeredNeuralNetworkMessage implements Writable {
-
- protected double trainingError;
- protected DoubleMatrix[] curMatrices;
- protected DoubleMatrix[] prevMatrices;
- protected boolean converge;
-
- public SmallLayeredNeuralNetworkMessage() {
- }
-
- public SmallLayeredNeuralNetworkMessage(double trainingError,
- boolean converge, DoubleMatrix[] weightMatrices,
- DoubleMatrix[] prevMatrices) {
- this.trainingError = trainingError;
- this.converge = converge;
- this.curMatrices = weightMatrices;
- this.prevMatrices = prevMatrices;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- trainingError = input.readDouble();
- converge = input.readBoolean();
- int numMatrices = input.readInt();
- boolean hasPrevMatrices = input.readBoolean();
- curMatrices = new DenseDoubleMatrix[numMatrices];
- // read matrice updates
- for (int i = 0; i < curMatrices.length; ++i) {
- curMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
- }
-
- if (hasPrevMatrices) {
- prevMatrices = new DenseDoubleMatrix[numMatrices];
- // read previous matrices updates
- for (int i = 0; i < prevMatrices.length; ++i) {
- prevMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
- }
- }
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeDouble(trainingError);
- output.writeBoolean(converge);
- output.writeInt(curMatrices.length);
- if (prevMatrices == null) {
- output.writeBoolean(false);
- } else {
- output.writeBoolean(true);
- }
- for (DoubleMatrix matrix : curMatrices) {
- MatrixWritable.write(matrix, output);
- }
- if (prevMatrices != null) {
- for (DoubleMatrix matrix : prevMatrices) {
- MatrixWritable.write(matrix, output);
- }
- }
- }
-
- public double getTrainingError() {
- return trainingError;
- }
-
- public void setTrainingError(double trainingError) {
- this.trainingError = trainingError;
- }
-
- public boolean isConverge() {
- return converge;
- }
-
- public void setConverge(boolean converge) {
- this.converge = converge;
- }
-
- public DoubleMatrix[] getCurMatrices() {
- return curMatrices;
- }
-
- public void setMatrices(DoubleMatrix[] curMatrices) {
- this.curMatrices = curMatrices;
- }
-
- public DoubleMatrix[] getPrevMatrices() {
- return prevMatrices;
- }
-
- public void setPrevMatrices(DoubleMatrix[] prevMatrices) {
- this.prevMatrices = prevMatrices;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
deleted file mode 100644
index c3e258c..0000000
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.ipc.RPC;
-
-import com.google.common.base.Preconditions;
-
-/**
- * The trainer that train the {@link SmallLayeredNeuralNetwork} based on BSP
- * framework.
- *
- */
-public final class SmallLayeredNeuralNetworkTrainer
- extends
- BSP<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> {
-
- private static final Log LOG = LogFactory
- .getLog(SmallLayeredNeuralNetworkTrainer.class);
-
- /* When given peer is master worker: base of parameter merge */
- /* When given peer is slave worker: neural network for training */
- private SmallLayeredNeuralNetwork inMemoryModel;
-
- /* Job configuration */
- private HamaConfiguration conf;
-
- /* Default batch size */
- private int batchSize;
-
- /* whether it is converging or not */
- private AtomicBoolean isConverge;
-
- /* When given peer is master worker: Asynchronous parameter merger */
- /* When given peer is slave worker: null */
- private RPC.Server merger;
-
- /* When given peer is master worker: null */
- /* When given peer is slave worker: proxy to Asynchronous parameter merger */
- private ParameterMerger proxy;
-
- /**
- * Returns true if this worker is master worker.
- *
- * @param peer
- * */
- private boolean isMaster(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
- return peer.getPeerIndex() == peer.getNumPeers() - 1;
- }
-
- @Override
- /**
- * If the model path is specified, load the existing from storage location.
- */
- public void setup(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
- // At least one master & slave worker exist.
- Preconditions.checkArgument(peer.getNumPeers() >= 2);
- this.conf = peer.getConfiguration();
-
- String modelPath = conf.get("model.path");
- this.inMemoryModel = new SmallLayeredNeuralNetwork(conf, modelPath);
-
- this.batchSize = conf.getInt("training.batch.size", 50);
- this.isConverge = new AtomicBoolean(false);
-
- int slaveCount = peer.getNumPeers() - 1;
- int mergeLimit = conf.getInt("training.max.iterations", 100000);
- int convergenceCheckInterval = peer.getNumPeers()
- * conf.getInt("convergence.check.interval", 2000);
- String master = peer.getPeerName();
- String masterAddr = master.substring(0, master.indexOf(':'));
- int port = conf.getInt("sync.server.port", 40052);
-
- if (isMaster(peer)) {
- try {
- this.merger = RPC.getServer(new ParameterMergerServer(inMemoryModel,
- isConverge, slaveCount, mergeLimit, convergenceCheckInterval),
- masterAddr, port, conf);
- merger.start();
- } catch (IOException e) {
- e.printStackTrace();
- }
- LOG.info("Begin to train");
- } else {
- InetSocketAddress addr = new InetSocketAddress(masterAddr, port);
- try {
- this.proxy = (ParameterMerger) RPC.getProxy(ParameterMerger.class,
- ParameterMerger.versionID, addr, conf);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Override
- /**
- * Write the trained model back to stored location.
- */
- public void cleanup(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
- // write model to modelPath
- if (isMaster(peer)) {
- try {
- LOG.info("Write model back to " + inMemoryModel.getModelPath());
- this.inMemoryModel.writeModelToFile();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Override
- public void bsp(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer)
- throws IOException, SyncException, InterruptedException {
- while (!this.isConverge.get()) {
- // each slave-worker calculate the matrices updates according to local
- // data
- // and merge them with master
- if (!isMaster(peer)) {
- calculateUpdates(peer);
- }
- }
-
- if (isMaster(peer)) {
- merger.stop();
- }
- peer.sync(); // finalize the bsp program.
- }
-
- /**
- * Calculate the matrices updates according to local partition of data.
- *
- * @param peer
- * @throws IOException
- */
- private void calculateUpdates(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer)
- throws IOException {
-
- DoubleMatrix[] weightUpdates = new DoubleMatrix[this.inMemoryModel.weightMatrixList
- .size()];
- for (int i = 0; i < weightUpdates.length; ++i) {
- int row = this.inMemoryModel.weightMatrixList.get(i).getRowCount();
- int col = this.inMemoryModel.weightMatrixList.get(i).getColumnCount();
- weightUpdates[i] = new DenseDoubleMatrix(row, col);
- }
-
- // continue to train
- double avgTrainingError = 0.0;
- LongWritable key = new LongWritable();
- VectorWritable value = new VectorWritable();
- for (int recordsRead = 0; recordsRead < batchSize; ++recordsRead) {
- if (!peer.readNext(key, value)) {
- peer.reopenInput();
- peer.readNext(key, value);
- }
- DoubleVector trainingInstance = value.getVector();
- SmallLayeredNeuralNetwork.matricesAdd(weightUpdates,
- this.inMemoryModel.trainByInstance(trainingInstance));
- avgTrainingError += this.inMemoryModel.trainingError;
- }
- avgTrainingError /= batchSize;
-
- // calculate the average of updates
- for (int i = 0; i < weightUpdates.length; ++i) {
- weightUpdates[i] = weightUpdates[i].divide(batchSize);
- }
-
- // exchange parameter update with master
- SmallLayeredNeuralNetworkMessage msg = new SmallLayeredNeuralNetworkMessage(
- avgTrainingError, false, weightUpdates,
- this.inMemoryModel.getPrevMatricesUpdates());
-
- SmallLayeredNeuralNetworkMessage inMessage = proxy.merge(msg);
- DoubleMatrix[] newWeights = inMessage.getCurMatrices();
- DoubleMatrix[] preWeightUpdates = inMessage.getPrevMatrices();
- this.inMemoryModel.setWeightMatrices(newWeights);
- this.inMemoryModel.setPrevWeightMatrices(preWeightUpdates);
- this.isConverge.set(inMessage.isConverge());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/Synapse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/Synapse.java b/src/main/java/org/apache/horn/bsp/Synapse.java
deleted file mode 100644
index 61725f9..0000000
--- a/src/main/java/org/apache/horn/bsp/Synapse.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Message wrapper for a propagating message
- */
-public class Synapse<M extends Writable, W extends Writable> implements
- Writable {
-
- DoubleWritable message;
- DoubleWritable weight;
- DoubleWritable prevWeight;
-
- public Synapse(DoubleWritable message, DoubleWritable weight) {
- this.message = message;
- this.weight = weight;
- }
-
- public Synapse(DoubleWritable message, DoubleWritable weight, DoubleWritable prevWeight) {
- this.message = message;
- this.weight = weight;
- this.prevWeight = prevWeight;
- }
-
- /**
- * @return the activation or error message
- */
- public double getMessage() {
- return message.get();
- }
-
- public double getInput() {
- // returns the input
- return message.get();
- }
-
- public double getDelta() {
- // returns the delta
- return message.get();
- }
-
- public double getWeight() {
- return weight.get();
- }
-
- public double getPrevWeight() {
- return prevWeight.get();
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- message.readFields(in);
- weight.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- message.write(out);
- weight.write(out);
- }
-
-}
\ No newline at end of file