You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@horn.apache.org by ed...@apache.org on 2016/04/26 05:46:22 UTC
[1/4] incubator-horn git commit: Code refactoring
Repository: incubator-horn
Updated Branches:
refs/heads/master 277773c0a -> ac8eaf8e1
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java b/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
new file mode 100644
index 0000000..7e4328f
--- /dev/null
+++ b/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetwork.java
@@ -0,0 +1,642 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.hama.ml.util.DefaultFeatureTransformer;
+import org.apache.hama.ml.util.FeatureTransformer;
+import org.apache.horn.core.AbstractLayeredNeuralNetwork.LearningStyle;
+import org.apache.horn.core.AbstractLayeredNeuralNetwork.TrainingMethod;
+import org.apache.horn.funcs.FunctionFactory;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+/**
+ * Test the functionality of SmallLayeredNeuralNetwork.
+ *
+ */
+public class TestSmallLayeredNeuralNetwork extends MLTestBase {
+
+ @Test
+ public void testReadWrite() {
+ LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+ ann.addLayer(2, false,
+ FunctionFactory.createDoubleFunction("IdentityFunction"));
+ ann.addLayer(5, false,
+ FunctionFactory.createDoubleFunction("IdentityFunction"));
+ ann.addLayer(1, true,
+ FunctionFactory.createDoubleFunction("IdentityFunction"));
+ ann.setCostFunction(FunctionFactory
+ .createDoubleDoubleFunction("SquaredError"));
+ double learningRate = 0.2;
+ // ann.setLearningRate(learningRate);
+ double momentumWeight = 0.5;
+ // ann.setMomemtumWeight(momentumWeight);
+ double regularizationWeight = 0.05;
+ //ann.setRegularizationWeight(regularizationWeight);
+ // intentionally initialize all weights to 0.5
+ DoubleMatrix[] matrices = new DenseDoubleMatrix[2];
+ matrices[0] = new DenseDoubleMatrix(5, 3, 0.2);
+ matrices[1] = new DenseDoubleMatrix(1, 6, 0.8);
+ ann.setWeightMatrices(matrices);
+ ann.setLearningStyle(LearningStyle.UNSUPERVISED);
+
+ FeatureTransformer defaultFeatureTransformer = new DefaultFeatureTransformer();
+ ann.setFeatureTransformer(defaultFeatureTransformer);
+
+
+ // write to file
+ String modelPath = "/tmp/testSmallLayeredNeuralNetworkReadWrite";
+ ann.setModelPath(modelPath);
+ try {
+ ann.writeModelToFile();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ // read from file
+ LayeredNeuralNetwork annCopy = new LayeredNeuralNetwork(new HamaConfiguration(), modelPath);
+ assertEquals(annCopy.getClass().getSimpleName(), annCopy.getModelType());
+ assertEquals(modelPath, annCopy.getModelPath());
+ // assertEquals(learningRate, annCopy.getLearningRate(), 0.000001);
+ // assertEquals(momentumWeight, annCopy.getMomemtumWeight(), 0.000001);
+ //assertEquals(regularizationWeight, annCopy.getRegularizationWeight(),
+ // 0.000001);
+ assertEquals(TrainingMethod.GRADIENT_DESCENT, annCopy.getTrainingMethod());
+ assertEquals(LearningStyle.UNSUPERVISED, annCopy.getLearningStyle());
+
+ // compare weights
+ DoubleMatrix[] weightsMatrices = annCopy.getWeightMatrices();
+ for (int i = 0; i < weightsMatrices.length; ++i) {
+ DoubleMatrix expectMat = matrices[i];
+ DoubleMatrix actualMat = weightsMatrices[i];
+ for (int j = 0; j < expectMat.getRowCount(); ++j) {
+ for (int k = 0; k < expectMat.getColumnCount(); ++k) {
+ assertEquals(expectMat.get(j, k), actualMat.get(j, k), 0.000001);
+ }
+ }
+ }
+
+ FeatureTransformer copyTransformer = annCopy.getFeatureTransformer();
+ assertEquals(defaultFeatureTransformer.getClass().getName(), copyTransformer.getClass().getName());
+ }
+
+ @Test
+ /**
+ * Test the forward functionality.
+ */
+ public void testOutput() {
+ // first network
+ LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+ ann.addLayer(2, false,
+ FunctionFactory.createDoubleFunction("IdentityFunction"));
+ ann.addLayer(5, false,
+ FunctionFactory.createDoubleFunction("IdentityFunction"));
+ ann.addLayer(1, true,
+ FunctionFactory.createDoubleFunction("IdentityFunction"));
+ ann.setCostFunction(FunctionFactory
+ .createDoubleDoubleFunction("SquaredError"));
+ // ann.setLearningRate(0.1);
+ // intentionally initialize all weights to 0.5
+ DoubleMatrix[] matrices = new DenseDoubleMatrix[2];
+ matrices[0] = new DenseDoubleMatrix(5, 3, 0.5);
+ matrices[1] = new DenseDoubleMatrix(1, 6, 0.5);
+ ann.setWeightMatrices(matrices);
+
+ double[] arr = new double[] { 0, 1 };
+ DoubleVector training = new DenseDoubleVector(arr);
+ DoubleVector result = ann.getOutput(training);
+ assertEquals(1, result.getDimension());
+ // assertEquals(3, result.get(0), 0.000001);
+
+ // second network
+ LayeredNeuralNetwork ann2 = new LayeredNeuralNetwork();
+ ann2.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann2.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann2.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann2.setCostFunction(FunctionFactory
+ .createDoubleDoubleFunction("SquaredError"));
+ // ann2.setLearningRate(0.3);
+ // intentionally initialize all weights to 0.5
+ DoubleMatrix[] matrices2 = new DenseDoubleMatrix[2];
+ matrices2[0] = new DenseDoubleMatrix(3, 3, 0.5);
+ matrices2[1] = new DenseDoubleMatrix(1, 4, 0.5);
+ ann2.setWeightMatrices(matrices2);
+
+ double[] test = { 0, 0 };
+ double[] result2 = { 0.807476 };
+
+ DoubleVector vec = ann2.getOutput(new DenseDoubleVector(test));
+ assertArrayEquals(result2, vec.toArray(), 0.000001);
+
+ LayeredNeuralNetwork ann3 = new LayeredNeuralNetwork();
+ ann3.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann3.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann3.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann3.setCostFunction(FunctionFactory
+ .createDoubleDoubleFunction("SquaredError"));
+ // ann3.setLearningRate(0.3);
+ // intentionally initialize all weights to 0.5
+ DoubleMatrix[] initMatrices = new DenseDoubleMatrix[2];
+ initMatrices[0] = new DenseDoubleMatrix(3, 3, 0.5);
+ initMatrices[1] = new DenseDoubleMatrix(1, 4, 0.5);
+ ann3.setWeightMatrices(initMatrices);
+
+ double[] instance = { 0, 1 };
+ DoubleVector output = ann3.getOutput(new DenseDoubleVector(instance));
+ assertEquals(0.8315410, output.get(0), 0.000001);
+ }
+
+ @Test
+ public void testXORlocal() {
+ LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+ ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.setCostFunction(FunctionFactory
+ .createDoubleDoubleFunction("SquaredError"));
+ // ann.setLearningRate(0.5);
+ // ann.setMomemtumWeight(0.0);
+
+ int iterations = 50000; // iteration should be set to a very large number
+ double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
+ for (int i = 0; i < iterations; ++i) {
+ DoubleMatrix[] matrices = null;
+ for (int j = 0; j < instances.length; ++j) {
+ matrices = ann.trainByInstance(new DenseDoubleVector(instances[j
+ % instances.length]));
+ ann.updateWeightMatrices(matrices);
+ }
+ }
+
+ for (int i = 0; i < instances.length; ++i) {
+ DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
+ // the expected output is the last element in array
+ double result = instances[i][2];
+ double actual = ann.getOutput(input).get(0);
+ if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
+ Log.info("Neural network failes to lear the XOR.");
+ }
+ }
+
+ // write model into file and read out
+ String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocal";
+ ann.setModelPath(modelPath);
+ try {
+ ann.writeModelToFile();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ LayeredNeuralNetwork annCopy = new LayeredNeuralNetwork(new HamaConfiguration(), modelPath);
+ // test on instances
+ for (int i = 0; i < instances.length; ++i) {
+ DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
+ // the expected output is the last element in array
+ double result = instances[i][2];
+ double actual = annCopy.getOutput(input).get(0);
+ if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
+ Log.info("Neural network failes to lear the XOR.");
+ }
+ }
+ }
+
+ @Test
+ public void testXORWithMomentum() {
+ LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+ ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.setCostFunction(FunctionFactory
+ .createDoubleDoubleFunction("SquaredError"));
+ // ann.setLearningRate(0.6);
+ // ann.setMomemtumWeight(0.3);
+
+ int iterations = 2000; // iteration should be set to a very large number
+ double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
+ for (int i = 0; i < iterations; ++i) {
+ for (int j = 0; j < instances.length; ++j) {
+ ann.trainOnline(new DenseDoubleVector(instances[j % instances.length]));
+ }
+ }
+
+ for (int i = 0; i < instances.length; ++i) {
+ DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
+ // the expected output is the last element in array
+ double result = instances[i][2];
+ double actual = ann.getOutput(input).get(0);
+ if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
+ Log.info("Neural network failes to lear the XOR.");
+ }
+ }
+
+ // write model into file and read out
+ String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocalWithMomentum";
+ ann.setModelPath(modelPath);
+ try {
+ ann.writeModelToFile();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ LayeredNeuralNetwork annCopy = new LayeredNeuralNetwork(new HamaConfiguration(), modelPath);
+ // test on instances
+ for (int i = 0; i < instances.length; ++i) {
+ DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
+ // the expected output is the last element in array
+ double result = instances[i][2];
+ double actual = annCopy.getOutput(input).get(0);
+ if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
+ Log.info("Neural network failes to lear the XOR.");
+ }
+ }
+ }
+
+ @Test
+ public void testXORLocalWithRegularization() {
+ LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+ ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.setCostFunction(FunctionFactory
+ .createDoubleDoubleFunction("SquaredError"));
+ // ann.setLearningRate(0.7);
+ // ann.setMomemtumWeight(0.5);
+ //ann.setRegularizationWeight(0.002);
+
+ int iterations = 5000; // iteration should be set to a very large number
+ double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
+ for (int i = 0; i < iterations; ++i) {
+ for (int j = 0; j < instances.length; ++j) {
+ ann.trainOnline(new DenseDoubleVector(instances[j % instances.length]));
+ }
+ }
+
+ for (int i = 0; i < instances.length; ++i) {
+ DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
+ // the expected output is the last element in array
+ double result = instances[i][2];
+ double actual = ann.getOutput(input).get(0);
+ if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
+ Log.info("Neural network failes to lear the XOR.");
+ }
+ }
+
+ // write model into file and read out
+ String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocalWithRegularization";
+ ann.setModelPath(modelPath);
+ try {
+ ann.writeModelToFile();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ LayeredNeuralNetwork annCopy = new LayeredNeuralNetwork(new HamaConfiguration(), modelPath);
+ // test on instances
+ for (int i = 0; i < instances.length; ++i) {
+ DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
+ // the expected output is the last element in array
+ double result = instances[i][2];
+ double actual = annCopy.getOutput(input).get(0);
+ if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
+ Log.info("Neural network failes to lear the XOR.");
+ }
+ }
+ }
+
+ @Test
+ public void testTwoClassClassification() {
+ // use logistic regression data
+ String filepath = "src/test/resources/logistic_regression_data.txt";
+ List<double[]> instanceList = new ArrayList<double[]>();
+
+ try {
+ BufferedReader br = new BufferedReader(new FileReader(filepath));
+ String line = null;
+ while ((line = br.readLine()) != null) {
+ String[] tokens = line.trim().split(",");
+ double[] instance = new double[tokens.length];
+ for (int i = 0; i < tokens.length; ++i) {
+ instance[i] = Double.parseDouble(tokens[i]);
+ }
+ instanceList.add(instance);
+ }
+ br.close();
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ zeroOneNormalization(instanceList, instanceList.get(0).length - 1);
+
+ int dimension = instanceList.get(0).length - 1;
+
+ // divide dataset into training and testing
+ List<double[]> testInstances = new ArrayList<double[]>();
+ testInstances.addAll(instanceList.subList(instanceList.size() - 100,
+ instanceList.size()));
+ List<double[]> trainingInstances = instanceList.subList(0,
+ instanceList.size() - 100);
+
+ LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+ // ann.setLearningRate(0.001);
+ // ann.setMomemtumWeight(0.1);
+ //ann.setRegularizationWeight(0.01);
+ ann.addLayer(dimension, false,
+ FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(dimension, false,
+ FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(dimension, false,
+ FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.setCostFunction(FunctionFactory
+ .createDoubleDoubleFunction("CrossEntropy"));
+
+ long start = new Date().getTime();
+ int iterations = 1000;
+ for (int i = 0; i < iterations; ++i) {
+ for (double[] trainingInstance : trainingInstances) {
+ ann.trainOnline(new DenseDoubleVector(trainingInstance));
+ }
+ }
+ long end = new Date().getTime();
+ Log.info(String.format("Training time: %fs\n",
+ (double) (end - start) / 1000));
+
+ double errorRate = 0;
+ // calculate the error on test instance
+ for (double[] testInstance : testInstances) {
+ DoubleVector instance = new DenseDoubleVector(testInstance);
+ double expected = instance.get(instance.getDimension() - 1);
+ instance = instance.slice(instance.getDimension() - 1);
+ double actual = ann.getOutput(instance).get(0);
+ if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) {
+ ++errorRate;
+ }
+ }
+ errorRate /= testInstances.size();
+
+ Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
+ }
+
+ @Test
+ public void testLogisticRegression() {
+ this.testLogisticRegressionDistributedVersion();
+ this.testLogisticRegressionDistributedVersionWithFeatureTransformer();
+ }
+
+ public void testLogisticRegressionDistributedVersion() {
+ // write data into a sequence file
+ String tmpStrDatasetPath = "/tmp/logistic_regression_data";
+ Path tmpDatasetPath = new Path(tmpStrDatasetPath);
+ String strDataPath = "src/test/resources/logistic_regression_data.txt";
+ String modelPath = "/tmp/logistic-regression-distributed-model";
+
+ Configuration conf = new Configuration();
+ List<double[]> instanceList = new ArrayList<double[]>();
+ List<double[]> trainingInstances = null;
+ List<double[]> testInstances = null;
+
+ try {
+ FileSystem fs = FileSystem.get(new URI(tmpStrDatasetPath), conf);
+ fs.delete(tmpDatasetPath, true);
+ if (fs.exists(tmpDatasetPath)) {
+ fs.createNewFile(tmpDatasetPath);
+ }
+
+ BufferedReader br = new BufferedReader(new FileReader(strDataPath));
+ String line = null;
+ int count = 0;
+ while ((line = br.readLine()) != null) {
+ String[] tokens = line.trim().split(",");
+ double[] instance = new double[tokens.length];
+ for (int i = 0; i < tokens.length; ++i) {
+ instance[i] = Double.parseDouble(tokens[i]);
+ }
+ instanceList.add(instance);
+ }
+ br.close();
+
+ zeroOneNormalization(instanceList, instanceList.get(0).length - 1);
+
+ // write training data to temporal sequence file
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
+ tmpDatasetPath, LongWritable.class, VectorWritable.class);
+ int testSize = 150;
+
+ Collections.shuffle(instanceList);
+ testInstances = new ArrayList<double[]>();
+ testInstances.addAll(instanceList.subList(instanceList.size() - testSize,
+ instanceList.size()));
+ trainingInstances = instanceList.subList(0, instanceList.size()
+ - testSize);
+
+ for (double[] instance : trainingInstances) {
+ DoubleVector vec = new DenseDoubleVector(instance);
+ writer.append(new LongWritable(count++), new VectorWritable(vec));
+ }
+ writer.close();
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+
+ // create model
+ int dimension = 8;
+ LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+ // ann.setLearningRate(0.7);
+ // ann.setMomemtumWeight(0.5);
+ //ann.setRegularizationWeight(0.1);
+ ann.addLayer(dimension, false,
+ FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(dimension, false,
+ FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(dimension, false,
+ FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.setCostFunction(FunctionFactory
+ .createDoubleDoubleFunction("CrossEntropy"));
+ ann.setModelPath(modelPath);
+
+ long start = new Date().getTime();
+ Map<String, String> trainingParameters = new HashMap<String, String>();
+ trainingParameters.put("tasks", "5");
+ trainingParameters.put("training.max.iterations", "2000");
+ trainingParameters.put("training.batch.size", "300");
+ trainingParameters.put("convergence.check.interval", "1000");
+ //ann.train(new HamaConfiguration(), tmpDatasetPath, trainingParameters);
+
+ long end = new Date().getTime();
+
+ // validate results
+ double errorRate = 0;
+ // calculate the error on test instance
+ for (double[] testInstance : testInstances) {
+ DoubleVector instance = new DenseDoubleVector(testInstance);
+ double expected = instance.get(instance.getDimension() - 1);
+ instance = instance.slice(instance.getDimension() - 1);
+ double actual = ann.getOutput(instance).get(0);
+ if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) {
+ ++errorRate;
+ }
+ }
+ errorRate /= testInstances.size();
+
+ Log.info(String.format("Training time: %fs\n",
+ (double) (end - start) / 1000));
+ Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
+ }
+
+ public void testLogisticRegressionDistributedVersionWithFeatureTransformer() {
+ // write data into a sequence file
+ String tmpStrDatasetPath = "/tmp/logistic_regression_data_feature_transformer";
+ Path tmpDatasetPath = new Path(tmpStrDatasetPath);
+ String strDataPath = "src/test/resources/logistic_regression_data.txt";
+ String modelPath = "/tmp/logistic-regression-distributed-model-feature-transformer";
+
+ Configuration conf = new Configuration();
+ List<double[]> instanceList = new ArrayList<double[]>();
+ List<double[]> trainingInstances = null;
+ List<double[]> testInstances = null;
+
+ try {
+ FileSystem fs = FileSystem.get(new URI(tmpStrDatasetPath), conf);
+ fs.delete(tmpDatasetPath, true);
+ if (fs.exists(tmpDatasetPath)) {
+ fs.createNewFile(tmpDatasetPath);
+ }
+
+ BufferedReader br = new BufferedReader(new FileReader(strDataPath));
+ String line = null;
+ int count = 0;
+ while ((line = br.readLine()) != null) {
+ String[] tokens = line.trim().split(",");
+ double[] instance = new double[tokens.length];
+ for (int i = 0; i < tokens.length; ++i) {
+ instance[i] = Double.parseDouble(tokens[i]);
+ }
+ instanceList.add(instance);
+ }
+ br.close();
+
+ zeroOneNormalization(instanceList, instanceList.get(0).length - 1);
+
+ // write training data to temporal sequence file
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
+ tmpDatasetPath, LongWritable.class, VectorWritable.class);
+ int testSize = 150;
+
+ Collections.shuffle(instanceList);
+ testInstances = new ArrayList<double[]>();
+ testInstances.addAll(instanceList.subList(instanceList.size() - testSize,
+ instanceList.size()));
+ trainingInstances = instanceList.subList(0, instanceList.size()
+ - testSize);
+
+ for (double[] instance : trainingInstances) {
+ DoubleVector vec = new DenseDoubleVector(instance);
+ writer.append(new LongWritable(count++), new VectorWritable(vec));
+ }
+ writer.close();
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+
+ // create model
+ int dimension = 8;
+ LayeredNeuralNetwork ann = new LayeredNeuralNetwork();
+ // ann.setLearningRate(0.7);
+ // ann.setMomemtumWeight(0.5);
+ //ann.setRegularizationWeight(0.1);
+ ann.addLayer(dimension, false,
+ FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(dimension, false,
+ FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(dimension, false,
+ FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
+ ann.setCostFunction(FunctionFactory
+ .createDoubleDoubleFunction("CrossEntropy"));
+ ann.setModelPath(modelPath);
+
+ FeatureTransformer featureTransformer = new DefaultFeatureTransformer();
+
+ ann.setFeatureTransformer(featureTransformer);
+
+ long start = new Date().getTime();
+ Map<String, String> trainingParameters = new HashMap<String, String>();
+ trainingParameters.put("tasks", "5");
+ trainingParameters.put("training.max.iterations", "2000");
+ trainingParameters.put("training.batch.size", "300");
+ trainingParameters.put("convergence.check.interval", "1000");
+ //ann.train(new HamaConfiguration(), tmpDatasetPath, trainingParameters);
+
+ long end = new Date().getTime();
+
+ // validate results
+ double errorRate = 0;
+ // calculate the error on test instance
+ for (double[] testInstance : testInstances) {
+ DoubleVector instance = new DenseDoubleVector(testInstance);
+ double expected = instance.get(instance.getDimension() - 1);
+ instance = instance.slice(instance.getDimension() - 1);
+ instance = featureTransformer.transform(instance);
+ double actual = ann.getOutput(instance).get(0);
+ if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) {
+ ++errorRate;
+ }
+ }
+ errorRate /= testInstances.size();
+
+ Log.info(String.format("Training time: %fs\n",
+ (double) (end - start) / 1000));
+ Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetworkMessage.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetworkMessage.java b/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetworkMessage.java
new file mode 100644
index 0000000..a0c66d2
--- /dev/null
+++ b/src/test/java/org/apache/horn/core/TestSmallLayeredNeuralNetworkMessage.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.horn.core.ParameterMessage;
+import org.junit.Test;
+
+/**
+ * Test the functionalities of SmallLayeredNeuralNetworkMessage.
+ *
+ */
+public class TestSmallLayeredNeuralNetworkMessage {
+
+ @Test
+ public void testReadWriteWithoutPrev() {
+ double error = 0.22;
+ double[][] matrix1 = new double[][] { { 0.1, 0.2, 0.8, 0.5 },
+ { 0.3, 0.4, 0.6, 0.2 }, { 0.5, 0.6, 0.1, 0.5 } };
+ double[][] matrix2 = new double[][] { { 0.8, 1.2, 0.5 } };
+ DoubleMatrix[] matrices = new DoubleMatrix[2];
+ matrices[0] = new DenseDoubleMatrix(matrix1);
+ matrices[1] = new DenseDoubleMatrix(matrix2);
+
+ boolean isConverge = false;
+
+ ParameterMessage message = new ParameterMessage(
+ error, isConverge, matrices, null);
+ Configuration conf = new Configuration();
+ String strPath = "/tmp/testReadWriteSmallLayeredNeuralNetworkMessage";
+ Path path = new Path(strPath);
+ try {
+ FileSystem fs = FileSystem.get(new URI(strPath), conf);
+ FSDataOutputStream out = fs.create(path);
+ message.write(out);
+ out.close();
+
+ FSDataInputStream in = fs.open(path);
+ ParameterMessage readMessage = new ParameterMessage(
+ 0, isConverge, null, null);
+ readMessage.readFields(in);
+ in.close();
+ assertEquals(error, readMessage.getTrainingError(), 0.000001);
+ assertFalse(readMessage.isConverge());
+ DoubleMatrix[] readMatrices = readMessage.getCurMatrices();
+ assertEquals(2, readMatrices.length);
+ for (int i = 0; i < readMatrices.length; ++i) {
+ double[][] doubleMatrices = ((DenseDoubleMatrix) readMatrices[i])
+ .getValues();
+ double[][] doubleExpected = ((DenseDoubleMatrix) matrices[i])
+ .getValues();
+ for (int r = 0; r < doubleMatrices.length; ++r) {
+ assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001);
+ }
+ }
+
+ DoubleMatrix[] readPrevMatrices = readMessage.getPrevMatrices();
+ assertNull(readPrevMatrices);
+
+ // delete
+ fs.delete(path, true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testReadWriteWithPrev() {
+ double error = 0.22;
+ boolean isConverge = true;
+
+ double[][] matrix1 = new double[][] { { 0.1, 0.2, 0.8, 0.5 },
+ { 0.3, 0.4, 0.6, 0.2 }, { 0.5, 0.6, 0.1, 0.5 } };
+ double[][] matrix2 = new double[][] { { 0.8, 1.2, 0.5 } };
+ DoubleMatrix[] matrices = new DoubleMatrix[2];
+ matrices[0] = new DenseDoubleMatrix(matrix1);
+ matrices[1] = new DenseDoubleMatrix(matrix2);
+
+ double[][] prevMatrix1 = new double[][] { { 0.1, 0.1, 0.2, 0.3 },
+ { 0.2, 0.4, 0.1, 0.5 }, { 0.5, 0.1, 0.5, 0.2 } };
+ double[][] prevMatrix2 = new double[][] { { 0.1, 0.2, 0.5, 0.9 },
+ { 0.3, 0.5, 0.2, 0.6 }, { 0.6, 0.8, 0.7, 0.5 } };
+
+ DoubleMatrix[] prevMatrices = new DoubleMatrix[2];
+ prevMatrices[0] = new DenseDoubleMatrix(prevMatrix1);
+ prevMatrices[1] = new DenseDoubleMatrix(prevMatrix2);
+
+ ParameterMessage message = new ParameterMessage(
+ error, isConverge, matrices, prevMatrices);
+ Configuration conf = new Configuration();
+ String strPath = "/tmp/testReadWriteSmallLayeredNeuralNetworkMessageWithPrev";
+ Path path = new Path(strPath);
+ try {
+ FileSystem fs = FileSystem.get(new URI(strPath), conf);
+ FSDataOutputStream out = fs.create(path);
+ message.write(out);
+ out.close();
+
+ FSDataInputStream in = fs.open(path);
+ ParameterMessage readMessage = new ParameterMessage(
+ 0, isConverge, null, null);
+ readMessage.readFields(in);
+ in.close();
+
+ assertTrue(readMessage.isConverge());
+
+ DoubleMatrix[] readMatrices = readMessage.getCurMatrices();
+ assertEquals(2, readMatrices.length);
+ for (int i = 0; i < readMatrices.length; ++i) {
+ double[][] doubleMatrices = ((DenseDoubleMatrix) readMatrices[i])
+ .getValues();
+ double[][] doubleExpected = ((DenseDoubleMatrix) matrices[i])
+ .getValues();
+ for (int r = 0; r < doubleMatrices.length; ++r) {
+ assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001);
+ }
+ }
+
+ DoubleMatrix[] readPrevMatrices = readMessage.getPrevMatrices();
+ assertEquals(2, readPrevMatrices.length);
+ for (int i = 0; i < readPrevMatrices.length; ++i) {
+ double[][] doubleMatrices = ((DenseDoubleMatrix) readPrevMatrices[i])
+ .getValues();
+ double[][] doubleExpected = ((DenseDoubleMatrix) prevMatrices[i])
+ .getValues();
+ for (int r = 0; r < doubleMatrices.length; ++r) {
+ assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001);
+ }
+ }
+
+ // delete
+ fs.delete(path, true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java b/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
new file mode 100644
index 0000000..fd24c4f
--- /dev/null
+++ b/src/test/java/org/apache/horn/examples/MultiLayerPerceptronTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.examples;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.horn.core.HornJob;
+import org.apache.horn.core.LayeredNeuralNetwork;
+
+/**
+ * Test the functionality of NeuralNetwork Example.
+ */
+public class MultiLayerPerceptronTest extends HamaCluster {
+ private HamaConfiguration conf;
+ private FileSystem fs;
+ private String MODEL_PATH = "/tmp/neuralnets.model";
+ private String RESULT_PATH = "/tmp/neuralnets.txt";
+ private String SEQTRAIN_DATA = "/tmp/test-neuralnets.data";
+
+ public MultiLayerPerceptronTest() {
+ conf = new HamaConfiguration();
+ conf.set("bsp.master.address", "localhost");
+ conf.setBoolean("hama.child.redirect.log.console", true);
+ conf.setBoolean("hama.messenger.runtime.compression", true);
+ assertEquals("Make sure master addr is set to localhost:", "localhost",
+ conf.get("bsp.master.address"));
+ conf.set("bsp.local.dir", "/tmp/hama-test");
+ conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+ conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+ conf.set("hama.sync.client.class",
+ org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+ .getCanonicalName());
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ fs = FileSystem.get(conf);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ public void testNeuralnetsLabeling() throws IOException {
+ this.neuralNetworkTraining();
+
+ String featureDataPath = "src/test/resources/neuralnets_classification_test.txt";
+ try {
+ LayeredNeuralNetwork ann = new LayeredNeuralNetwork(conf,
+ MODEL_PATH);
+
+ // process data in streaming approach
+ FileSystem fs = FileSystem.get(new URI(featureDataPath), conf);
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ fs.open(new Path(featureDataPath))));
+
+ String line = null;
+ line = null;
+
+ // compare results with ground-truth
+ BufferedReader groundTruthReader = new BufferedReader(new FileReader(
+ "src/test/resources/neuralnets_classification_label.txt"));
+
+ double correct = 0;
+ int samples = 0;
+ while ((line = br.readLine()) != null) {
+ if (line.trim().length() == 0) {
+ continue;
+ }
+ String[] tokens = line.trim().split(",");
+ double[] vals = new double[tokens.length];
+ for (int i = 0; i < tokens.length; ++i) {
+ vals[i] = Double.parseDouble(tokens[i]);
+ }
+ DoubleVector instance = new DenseDoubleVector(vals);
+ DoubleVector result = ann.getOutput(instance);
+ double actual = result.toArray()[0];
+ double expected = Double.parseDouble(groundTruthReader.readLine());
+
+ LOG.info("evaluated: " + actual + ", expected: " + expected);
+ if (actual < 0.5 && expected < 0.5 || actual >= 0.5 && expected >= 0.5) {
+ ++correct;
+ }
+ samples++;
+ }
+
+ groundTruthReader.close();
+ br.close();
+
+ LOG.info("## Precision: " + (correct / samples));
+ assertTrue((correct / samples) > 0.5);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ fs.delete(new Path(RESULT_PATH), true);
+ fs.delete(new Path(MODEL_PATH), true);
+ fs.delete(new Path(SEQTRAIN_DATA), true);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ private void neuralNetworkTraining() {
+ String strTrainingDataPath = "src/test/resources/neuralnets_classification_training.txt";
+ int featureDimension = 8;
+ int labelDimension = 1;
+
+ Path sequenceTrainingDataPath = new Path(SEQTRAIN_DATA);
+ try {
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
+ sequenceTrainingDataPath, LongWritable.class, VectorWritable.class);
+ BufferedReader br = new BufferedReader(
+ new FileReader(strTrainingDataPath));
+ String line = null;
+ // convert the data in sequence file format
+ while ((line = br.readLine()) != null) {
+ String[] tokens = line.split(",");
+ double[] vals = new double[tokens.length];
+ for (int i = 0; i < tokens.length; ++i) {
+ vals[i] = Double.parseDouble(tokens[i]);
+ }
+ writer.append(new LongWritable(), new VectorWritable(
+ new DenseDoubleVector(vals)));
+ }
+ writer.close();
+ br.close();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+
+ try {
+ HornJob ann = MultiLayerPerceptron.createJob(conf, MODEL_PATH,
+ SEQTRAIN_DATA, 0.4, 0.2, 0.01, featureDimension, labelDimension,
+ 1000, 2);
+
+ long startTime = System.currentTimeMillis();
+ if (ann.waitForCompletion(true)) {
+ LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime)
+ / 1000.0 + " seconds");
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/examples/NeuralNetworkTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/examples/NeuralNetworkTest.java b/src/test/java/org/apache/horn/examples/NeuralNetworkTest.java
deleted file mode 100644
index 932b17a..0000000
--- a/src/test/java/org/apache/horn/examples/NeuralNetworkTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.examples;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hama.Constants;
-import org.apache.hama.HamaCluster;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.horn.bsp.HornJob;
-import org.apache.horn.bsp.SmallLayeredNeuralNetwork;
-
-/**
- * Test the functionality of NeuralNetwork Example.
- *
- */
-public class NeuralNetworkTest extends HamaCluster {
- private HamaConfiguration conf;
- private FileSystem fs;
- private String MODEL_PATH = "/tmp/neuralnets.model";
- private String RESULT_PATH = "/tmp/neuralnets.txt";
- private String SEQTRAIN_DATA = "/tmp/test-neuralnets.data";
-
- public NeuralNetworkTest() {
- conf = new HamaConfiguration();
- conf.set("bsp.master.address", "localhost");
- conf.setBoolean("hama.child.redirect.log.console", true);
- conf.setBoolean("hama.messenger.runtime.compression", true);
- assertEquals("Make sure master addr is set to localhost:", "localhost",
- conf.get("bsp.master.address"));
- conf.set("bsp.local.dir", "/tmp/hama-test");
- conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
- conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
- conf.set("hama.sync.client.class",
- org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
- .getCanonicalName());
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- fs = FileSystem.get(conf);
- }
-
- @Override
- public void tearDown() throws Exception {
- super.tearDown();
- }
-
- public void testNeuralnetsLabeling() throws IOException {
- this.neuralNetworkTraining();
-
- String featureDataPath = "src/test/resources/neuralnets_classification_test.txt";
- try {
- SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork(conf,
- MODEL_PATH);
-
- // process data in streaming approach
- FileSystem fs = FileSystem.get(new URI(featureDataPath), conf);
- BufferedReader br = new BufferedReader(new InputStreamReader(
- fs.open(new Path(featureDataPath))));
- Path outputPath = new Path(RESULT_PATH);
- if (fs.exists(outputPath)) {
- fs.delete(outputPath, true);
- }
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(
- fs.create(outputPath)));
-
- String line = null;
-
- while ((line = br.readLine()) != null) {
- if (line.trim().length() == 0) {
- continue;
- }
- String[] tokens = line.trim().split(",");
- double[] vals = new double[tokens.length];
- for (int i = 0; i < tokens.length; ++i) {
- vals[i] = Double.parseDouble(tokens[i]);
- }
- DoubleVector instance = new DenseDoubleVector(vals);
- DoubleVector result = ann.getOutput(instance);
- double[] arrResult = result.toArray();
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < arrResult.length; ++i) {
- sb.append(arrResult[i]);
- if (i != arrResult.length - 1) {
- sb.append(",");
- } else {
- sb.append("\n");
- }
- }
- bw.write(sb.toString());
- }
-
- br.close();
- bw.close();
-
- // compare results with ground-truth
- BufferedReader groundTruthReader = new BufferedReader(new FileReader(
- "src/test/resources/neuralnets_classification_label.txt"));
- List<Double> groundTruthList = new ArrayList<Double>();
- line = null;
- while ((line = groundTruthReader.readLine()) != null) {
- groundTruthList.add(Double.parseDouble(line));
- }
- groundTruthReader.close();
-
- BufferedReader resultReader = new BufferedReader(new FileReader(
- RESULT_PATH));
- List<Double> resultList = new ArrayList<Double>();
- while ((line = resultReader.readLine()) != null) {
- resultList.add(Double.parseDouble(line));
- }
- resultReader.close();
- int total = resultList.size();
- double correct = 0;
- for (int i = 0; i < groundTruthList.size(); ++i) {
- double actual = resultList.get(i);
- double expected = groundTruthList.get(i);
- LOG.info("evaluated: " + actual + ", expected: " + expected);
- if (actual < 0.5 && expected < 0.5 || actual >= 0.5 && expected >= 0.5) {
- ++correct;
- }
- }
-
- LOG.info("## Precision: " + (correct / total));
- assertTrue((correct / total) > 0.5);
-
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- fs.delete(new Path(RESULT_PATH), true);
- fs.delete(new Path(MODEL_PATH), true);
- fs.delete(new Path(SEQTRAIN_DATA), true);
- }
- }
-
- @SuppressWarnings("deprecation")
- private void neuralNetworkTraining() {
- String strTrainingDataPath = "src/test/resources/neuralnets_classification_training.txt";
- int featureDimension = 8;
- int labelDimension = 1;
-
- Path sequenceTrainingDataPath = new Path(SEQTRAIN_DATA);
- try {
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
- sequenceTrainingDataPath, LongWritable.class, VectorWritable.class);
- BufferedReader br = new BufferedReader(
- new FileReader(strTrainingDataPath));
- String line = null;
- // convert the data in sequence file format
- while ((line = br.readLine()) != null) {
- String[] tokens = line.split(",");
- double[] vals = new double[tokens.length];
- for (int i = 0; i < tokens.length; ++i) {
- vals[i] = Double.parseDouble(tokens[i]);
- }
- writer.append(new LongWritable(), new VectorWritable(
- new DenseDoubleVector(vals)));
- }
- writer.close();
- br.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
-
- try {
- HornJob ann = MultiLayerPerceptron.createJob(conf, MODEL_PATH,
- SEQTRAIN_DATA, 0.4, 0.2, 0.01, featureDimension, labelDimension,
- 1000, 2);
-
- long startTime = System.currentTimeMillis();
- if (ann.waitForCompletion(true)) {
- LOG.info("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/trainer/TestNeuron.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/trainer/TestNeuron.java b/src/test/java/org/apache/horn/trainer/TestNeuron.java
deleted file mode 100644
index b5f6bfc..0000000
--- a/src/test/java/org/apache/horn/trainer/TestNeuron.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.trainer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hama.HamaConfiguration;
-import org.apache.horn.bsp.Neuron;
-import org.apache.horn.bsp.Synapse;
-import org.apache.horn.funcs.Sigmoid;
-
-public class TestNeuron extends TestCase {
- private static double learningRate = 0.1;
- private static double bias = -1;
- private static double theta = 0.8;
-
- public static class MyNeuron extends
- Neuron<Synapse<DoubleWritable, DoubleWritable>> {
-
- @Override
- public void setup(HamaConfiguration conf) {
- }
-
- @Override
- public void forward(
- Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
- throws IOException {
- double sum = 0;
- for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
- sum += m.getInput() * m.getWeight();
- }
- sum += (bias * theta);
- this.feedforward(new Sigmoid().apply(sum));
- }
-
- @Override
- public void backward(
- Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
- throws IOException {
- for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
- // Calculates error gradient for each neuron
- double gradient = new Sigmoid().applyDerivative(this.getOutput()) * (m.getDelta() * m.getWeight());
-
- // Propagates to lower layer
- backpropagate(gradient);
-
- // Weight corrections
- double weight = learningRate * this.getOutput() * m.getDelta();
- this.push(weight);
- }
- }
-
- }
-
- public void testProp() throws IOException {
- List<Synapse<DoubleWritable, DoubleWritable>> x = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
- x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
- 1.0), new DoubleWritable(0.5)));
- x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
- 1.0), new DoubleWritable(0.4)));
-
- MyNeuron n = new MyNeuron();
- n.forward(x);
- assertEquals(0.5249791874789399, n.getOutput());
-
- x.clear();
- x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
- -0.1274), new DoubleWritable(-1.2)));
- n.backward(x);
- assertEquals(-0.006688234848481696, n.getUpdate());
- }
-
-}
[3/4] incubator-horn git commit: Code refactoring
Posted by ed...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
new file mode 100644
index 0000000..f87e771
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/AbstractLayeredNeuralNetwork.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.math.DoubleDoubleFunction;
+import org.apache.hama.commons.math.DoubleFunction;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.horn.funcs.FunctionFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * AbstractLayeredNeuralNetwork defines the general operations for derivative
+ * layered models, include Linear Regression, Logistic Regression, Multilayer
+ * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc.
+ *
+ * In general, these models consist of neurons which are aligned in layers.
+ * Between layers, for any two adjacent layers, the neurons are connected to
+ * form a bipartite weighted graph.
+ *
+ */
+abstract class AbstractLayeredNeuralNetwork extends AbstractNeuralNetwork {
+
+ private static final double DEFAULT_MOMENTUM_WEIGHT = 0.1;
+
+ double trainingError;
+
+ /* The momentumWeight */
+ protected double momentumWeight;
+
+ /* The cost function of the model */
+ protected DoubleDoubleFunction costFunction;
+
+ /* Record the size of each layer */
+ protected List<Integer> layerSizeList;
+
+ protected TrainingMethod trainingMethod;
+
+ protected LearningStyle learningStyle;
+
+ public static enum TrainingMethod {
+ GRADIENT_DESCENT
+ }
+
+ public static enum LearningStyle {
+ UNSUPERVISED,
+ SUPERVISED
+ }
+
+ public AbstractLayeredNeuralNetwork() {
+ this.momentumWeight = DEFAULT_MOMENTUM_WEIGHT;
+ this.trainingMethod = TrainingMethod.GRADIENT_DESCENT;
+ this.learningStyle = LearningStyle.SUPERVISED;
+ }
+
+ public AbstractLayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
+ super(conf, modelPath);
+ }
+
+ public void setTrainingMethod(TrainingMethod method) {
+ this.trainingMethod = method;
+ }
+
+ public TrainingMethod getTrainingMethod() {
+ return this.trainingMethod;
+ }
+
+ public void setLearningStyle(LearningStyle style) {
+ this.learningStyle = style;
+ }
+
+ public LearningStyle getLearningStyle() {
+ return this.learningStyle;
+ }
+
+ /**
+ * Set the cost function for the model.
+ *
+ * @param costFunction
+ */
+ public void setCostFunction(DoubleDoubleFunction costFunction) {
+ this.costFunction = costFunction;
+ }
+
+ /**
+ * Add a layer of neurons with specified size. If the added layer is not the
+ * first layer, it will automatically connects the neurons between with the
+ * previous layer.
+ *
+ * @param size
+ * @param isFinalLayer If false, add a bias neuron.
+ * @param squashingFunction The squashing function for this layer, input layer
+ * is f(x) = x by default.
+ * @return The layer index, starts with 0.
+ */
+ public abstract int addLayer(int size, boolean isFinalLayer,
+ DoubleFunction squashingFunction);
+
+ /**
+ * Get the size of a particular layer.
+ *
+ * @param layer
+ * @return The layer size.
+ */
+ public int getLayerSize(int layer) {
+ Preconditions.checkArgument(
+ layer >= 0 && layer < this.layerSizeList.size(),
+ String.format("Input must be in range [0, %d]\n",
+ this.layerSizeList.size() - 1));
+ return this.layerSizeList.get(layer);
+ }
+
+ /**
+ * Get the layer size list.
+ *
+ * @return The layer size list.
+ */
+ protected List<Integer> getLayerSizeList() {
+ return this.layerSizeList;
+ }
+
+ /**
+ * Get the weights between layer layerIdx and layerIdx + 1
+ *
+ * @param layerIdx The index of the layer
+ * @return The weights in form of {@link DoubleMatrix}
+ */
+ public abstract DoubleMatrix getWeightsByLayer(int layerIdx);
+
+ /**
+ * Get the updated weights using one training instance.
+ *
+ * @param trainingInstance The trainingInstance is the concatenation of
+ * feature vector and class label vector.
+ * @return The update of each weight, in form of matrix list.
+ * @throws Exception
+ */
+ public abstract DoubleMatrix[] trainByInstance(DoubleVector trainingInstance);
+
+ /**
+ * Get the output calculated by the model.
+ *
+ * @param instance The feature instance.
+ * @return a new vector with the result of the operation.
+ */
+ public abstract DoubleVector getOutput(DoubleVector instance);
+
+ /**
+ * Calculate the training error based on the labels and outputs.
+ *
+ * @param labels
+ * @param output
+ */
+ protected abstract void calculateTrainingError(DoubleVector labels,
+ DoubleVector output);
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+ // read momentum weight
+ this.momentumWeight = input.readDouble();
+
+ // read cost function
+ this.costFunction = FunctionFactory
+ .createDoubleDoubleFunction(WritableUtils.readString(input));
+
+ // read layer size list
+ int numLayers = input.readInt();
+ this.layerSizeList = Lists.newArrayList();
+ for (int i = 0; i < numLayers; ++i) {
+ this.layerSizeList.add(input.readInt());
+ }
+
+ this.trainingMethod = WritableUtils.readEnum(input, TrainingMethod.class);
+ this.learningStyle = WritableUtils.readEnum(input, LearningStyle.class);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ super.write(output);
+ // write momentum weight
+ output.writeDouble(this.momentumWeight);
+
+ // write cost function
+ WritableUtils.writeString(output, costFunction.getFunctionName());
+
+ // write layer size list
+ output.writeInt(this.layerSizeList.size());
+ for (Integer aLayerSizeList : this.layerSizeList) {
+ output.writeInt(aLayerSizeList);
+ }
+
+ WritableUtils.writeEnum(output, this.trainingMethod);
+ WritableUtils.writeEnum(output, this.learningStyle);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java b/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
new file mode 100644
index 0000000..45f56a3
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/AbstractNeuralNetwork.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.ml.util.DefaultFeatureTransformer;
+import org.apache.hama.ml.util.FeatureTransformer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+
+/**
+ * NeuralNetwork defines the general operations for all the derivative models.
+ * Typically, all derivative models such as Linear Regression, Logistic
+ * Regression, and Multilayer Perceptron consist of neurons and the weights
+ * between neurons.
+ *
+ */
+public abstract class AbstractNeuralNetwork implements Writable {
+ protected HamaConfiguration conf;
+ protected FileSystem fs;
+
+ private static final double DEFAULT_LEARNING_RATE = 0.5;
+
+ protected double learningRate;
+ protected boolean learningRateDecay = false;
+
+ // the name of the model
+ protected String modelType;
+ // the path to store the model
+ protected String modelPath;
+
+ protected FeatureTransformer featureTransformer;
+
+ public AbstractNeuralNetwork() {
+ this.learningRate = DEFAULT_LEARNING_RATE;
+ this.modelType = this.getClass().getSimpleName();
+ this.featureTransformer = new DefaultFeatureTransformer();
+ }
+
+ public AbstractNeuralNetwork(String modelPath) {
+ this.modelPath = modelPath;
+ }
+
+ public AbstractNeuralNetwork(HamaConfiguration conf, String modelPath) {
+ try {
+ this.conf = conf;
+ this.fs = FileSystem.get(conf);
+ this.modelPath = modelPath;
+
+ this.readFromModel();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public void isLearningRateDecay(boolean decay) {
+ this.learningRateDecay = decay;
+ }
+
+ public String getModelType() {
+ return this.modelType;
+ }
+
+ /**
+ * Train the model with the path of given training data and parameters.
+ *
+ * @param dataInputPath The path of the training data.
+ * @param trainingParams The parameters for training.
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ * @throws IOException
+ */
+ public BSPJob train(Configuration conf) throws ClassNotFoundException, IOException, InterruptedException {
+ Preconditions.checkArgument(this.modelPath != null,
+ "Please set the model path before training.");
+
+ // train with BSP job
+ return trainInternal((HamaConfiguration) conf);
+ }
+
+ /**
+ * Train the model with the path of given training data and parameters.
+ */
+ protected abstract BSPJob trainInternal(HamaConfiguration hamaConf)
+ throws IOException, InterruptedException, ClassNotFoundException;
+
+ /**
+ * Read the model meta-data from the specified location.
+ *
+ * @throws IOException
+ */
+ protected void readFromModel() throws IOException {
+ Preconditions.checkArgument(this.modelPath != null,
+ "Model path has not been set.");
+ FSDataInputStream is = new FSDataInputStream(fs.open(new Path(modelPath)));
+ this.readFields(is);
+ Closeables.close(is, false);
+ }
+
+ /**
+ * Write the model data to specified location.
+ *
+ * @throws IOException
+ */
+ public void writeModelToFile() throws IOException {
+ Preconditions.checkArgument(this.modelPath != null,
+ "Model path has not been set.");
+
+ FSDataOutputStream is = fs.create(new Path(this.modelPath), true);
+ this.write(is);
+
+ Closeables.close(is, false);
+ }
+
+ /**
+ * Set the model path.
+ *
+ * @param modelPath
+ */
+ public void setModelPath(String modelPath) {
+ this.modelPath = modelPath;
+ }
+
+ /**
+ * Get the model path.
+ *
+ * @return the path to store the model.
+ */
+ public String getModelPath() {
+ return this.modelPath;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ // read model type
+ this.modelType = WritableUtils.readString(input);
+ // read learning rate
+ this.learningRate = input.readDouble();
+ // read model path
+ this.modelPath = WritableUtils.readString(input);
+
+ if (this.modelPath.equals("null")) {
+ this.modelPath = null;
+ }
+
+ // read feature transformer
+ int bytesLen = input.readInt();
+ byte[] featureTransformerBytes = new byte[bytesLen];
+ for (int i = 0; i < featureTransformerBytes.length; ++i) {
+ featureTransformerBytes[i] = input.readByte();
+ }
+
+ Class<? extends FeatureTransformer> featureTransformerCls = (Class<? extends FeatureTransformer>) SerializationUtils
+ .deserialize(featureTransformerBytes);
+
+ Constructor[] constructors = featureTransformerCls
+ .getDeclaredConstructors();
+ Constructor constructor = constructors[0];
+
+ try {
+ this.featureTransformer = (FeatureTransformer) constructor
+ .newInstance(new Object[] {});
+ } catch (InstantiationException e) {
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ } catch (IllegalArgumentException e) {
+ e.printStackTrace();
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ // write model type
+ WritableUtils.writeString(output, modelType);
+ // write learning rate
+ output.writeDouble(learningRate);
+ // write model path
+ if (this.modelPath != null) {
+ WritableUtils.writeString(output, modelPath);
+ } else {
+ WritableUtils.writeString(output, "null");
+ }
+
+ // serialize the class
+ Class<? extends FeatureTransformer> featureTransformerCls = this.featureTransformer
+ .getClass();
+ byte[] featureTransformerBytes = SerializationUtils
+ .serialize(featureTransformerCls);
+ output.writeInt(featureTransformerBytes.length);
+ output.write(featureTransformerBytes);
+ }
+
+ public void setFeatureTransformer(FeatureTransformer featureTransformer) {
+ this.featureTransformer = featureTransformer;
+ }
+
+ public FeatureTransformer getFeatureTransformer() {
+ return this.featureTransformer;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/AbstractNeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/AbstractNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/core/AbstractNeuralNetworkTrainer.java
new file mode 100644
index 0000000..3547a1a
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/AbstractNeuralNetworkTrainer.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.ml.util.DefaultFeatureTransformer;
+import org.apache.hama.ml.util.FeatureTransformer;
+
+/**
+ * The trainer that is used to train the {@link LayeredNeuralNetwork} with
+ * BSP. The trainer would read the training data and obtain the trained
+ * parameters of the model.
+ *
+ */
+public abstract class AbstractNeuralNetworkTrainer
+ extends
+ BSP<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse<DoubleWritable, DoubleWritable>> {
+
+ protected static final Log LOG = LogFactory
+ .getLog(AbstractNeuralNetworkTrainer.class);
+
+ protected Configuration conf;
+ protected int maxIteration;
+ protected int batchSize;
+ protected String trainingMode;
+
+ protected FeatureTransformer featureTransformer;
+
+ @Override
+ final public void setup(
+ BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse<DoubleWritable, DoubleWritable>> peer)
+ throws IOException, SyncException, InterruptedException {
+ conf = peer.getConfiguration();
+ featureTransformer = new DefaultFeatureTransformer();
+ this.extraSetup(peer);
+ }
+
+ /**
+ * Handle extra setup for sub-classes.
+ *
+ * @param peer
+ * @throws IOException
+ * @throws SyncException
+ * @throws InterruptedException
+ */
+ protected void extraSetup(
+ BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse<DoubleWritable, DoubleWritable>> peer)
+ throws IOException, SyncException, InterruptedException {
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public abstract void bsp(
+ BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse<DoubleWritable, DoubleWritable>> peer)
+ throws IOException, SyncException, InterruptedException;
+
+ @Override
+ public void cleanup(
+ BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse<DoubleWritable, DoubleWritable>> peer)
+ throws IOException {
+ this.extraCleanup(peer);
+ // write model to modelPath
+ }
+
+ /**
+ * Handle cleanup for sub-classes. Write the trained model back.
+ *
+ * @param peer
+ * @throws IOException
+ * @throws SyncException
+ * @throws InterruptedException
+ */
+ protected void extraCleanup(
+ BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse<DoubleWritable, DoubleWritable>> peer)
+ throws IOException {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/AutoEncoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/AutoEncoder.java b/src/main/java/org/apache/horn/core/AutoEncoder.java
new file mode 100644
index 0000000..f638245
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/AutoEncoder.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleFunction;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.hama.ml.util.FeatureTransformer;
+import org.apache.horn.funcs.FunctionFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * AutoEncoder is a model used for dimensional reduction and feature learning.
+ * It is a special kind of {@link AbstractNeuralNetwork} that consists of three layers
+ * of neurons, where the first layer and third layer contains the same number of
+ * neurons.
+ *
+ */
+public class AutoEncoder {
+
+ private final LayeredNeuralNetwork model;
+
+ /**
+ * Initialize the autoencoder.
+ *
+ * @param inputDimensions The number of dimensions for the input feature.
+ * @param compressedDimensions The number of dimensions for the compressed
+ * information.
+ */
+ public AutoEncoder(int inputDimensions, int compressedDimensions) {
+ model = new LayeredNeuralNetwork();
+ model.addLayer(inputDimensions, false,
+ FunctionFactory.createDoubleFunction("Sigmoid"));
+ model.addLayer(compressedDimensions, false,
+ FunctionFactory.createDoubleFunction("Sigmoid"));
+ model.addLayer(inputDimensions, true,
+ FunctionFactory.createDoubleFunction("Sigmoid"));
+ model
+ .setLearningStyle(AbstractLayeredNeuralNetwork.LearningStyle.UNSUPERVISED);
+ model.setCostFunction(FunctionFactory
+ .createDoubleDoubleFunction("SquaredError"));
+ }
+
+ public AutoEncoder(HamaConfiguration conf, String modelPath) {
+ model = new LayeredNeuralNetwork(conf, modelPath);
+ }
+
+ public AutoEncoder setModelPath(String modelPath) {
+ model.setModelPath(modelPath);
+ return this;
+ }
+
+ /**
+ * Train the autoencoder with given data. Note that the training data is
+ * pre-processed, where the features
+ *
+ * @param dataInputPath
+ * @param trainingParams
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public BSPJob train(HamaConfiguration conf, Path dataInputPath,
+ Map<String, String> trainingParams) throws ClassNotFoundException, IOException, InterruptedException {
+ return model.train(conf);
+ }
+
+ /**
+ * Train the model with one instance.
+ *
+ * @param trainingInstance
+ */
+ public void trainOnline(DoubleVector trainingInstance) {
+ model.trainOnline(trainingInstance);
+ }
+
+ /**
+ * Get the matrix M used to encode the input features.
+ *
+ * @return this matrix with encode the input.
+ */
+ public DoubleMatrix getEncodeWeightMatrix() {
+ return model.getWeightsByLayer(0);
+ }
+
+ /**
+ * Get the matrix M used to decode the compressed information.
+ *
+ * @return this matrix with decode the compressed information.
+ */
+ public DoubleMatrix getDecodeWeightMatrix() {
+ return model.getWeightsByLayer(1);
+ }
+
+ /**
+ * Transform the input features.
+ *
+ * @param inputInstance
+ * @return The compressed information.
+ */
+ private DoubleVector transform(DoubleVector inputInstance, int inputLayer) {
+ DoubleVector internalInstance = new DenseDoubleVector(
+ inputInstance.getDimension() + 1);
+ internalInstance.set(0, 1);
+ for (int i = 0; i < inputInstance.getDimension(); ++i) {
+ internalInstance.set(i + 1, inputInstance.get(i));
+ }
+ DoubleFunction squashingFunction = model.getSquashingFunction(inputLayer);
+ DoubleMatrix weightMatrix = null;
+ if (inputLayer == 0) {
+ weightMatrix = this.getEncodeWeightMatrix();
+ } else {
+ weightMatrix = this.getDecodeWeightMatrix();
+ }
+ DoubleVector vec = weightMatrix.multiplyVectorUnsafe(internalInstance);
+ vec = vec.applyToElements(squashingFunction);
+ return vec;
+ }
+
+ /**
+ * Encode the input instance.
+ *
+ * @param inputInstance
+ * @return a new vector with the encode input instance.
+ */
+ public DoubleVector encode(DoubleVector inputInstance) {
+ Preconditions
+ .checkArgument(
+ inputInstance.getDimension() == model.getLayerSize(0) - 1,
+ String
+ .format(
+ "The dimension of input instance is %d, but the model requires dimension %d.",
+ inputInstance.getDimension(), model.getLayerSize(1) - 1));
+ return this.transform(inputInstance, 0);
+ }
+
+ /**
+ * Decode the input instance.
+ *
+ * @param inputInstance
+ * @return a new vector with the decode input instance.
+ */
+ public DoubleVector decode(DoubleVector inputInstance) {
+ Preconditions
+ .checkArgument(
+ inputInstance.getDimension() == model.getLayerSize(1) - 1,
+ String
+ .format(
+ "The dimension of input instance is %d, but the model requires dimension %d.",
+ inputInstance.getDimension(), model.getLayerSize(1) - 1));
+ return this.transform(inputInstance, 1);
+ }
+
+ /**
+ * Get the label(s) according to the given features.
+ *
+ * @param inputInstance
+ * @return a new vector with output of the model according to given feature
+ * instance.
+ */
+ public DoubleVector getOutput(DoubleVector inputInstance) {
+ return model.getOutput(inputInstance);
+ }
+
+ /**
+ * Set the feature transformer.
+ *
+ * @param featureTransformer
+ */
+ public void setFeatureTransformer(FeatureTransformer featureTransformer) {
+ this.model.setFeatureTransformer(featureTransformer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/HornJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/HornJob.java b/src/main/java/org/apache/horn/core/HornJob.java
new file mode 100644
index 0000000..82dcad8
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/HornJob.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.commons.math.Function;
+import org.apache.horn.funcs.FunctionFactory;
+
+public class HornJob extends BSPJob {
+
+ LayeredNeuralNetwork neuralNetwork;
+
+ public HornJob(HamaConfiguration conf, Class<?> exampleClass)
+ throws IOException {
+ super(conf);
+ this.setJarByClass(exampleClass);
+
+ neuralNetwork = new LayeredNeuralNetwork();
+ }
+
+ public void inputLayer(int featureDimension, Class<? extends Function> func) {
+ addLayer(featureDimension, func);
+ }
+
+ public void addLayer(int featureDimension, Class<? extends Function> func) {
+ neuralNetwork.addLayer(featureDimension, false,
+ FunctionFactory.createDoubleFunction(func.getSimpleName()));
+ }
+
+ public void outputLayer(int labels, Class<? extends Function> func) {
+ neuralNetwork.addLayer(labels, true,
+ FunctionFactory.createDoubleFunction(func.getSimpleName()));
+ }
+
+ public void setCostFunction(Class<? extends Function> func) {
+ neuralNetwork.setCostFunction(FunctionFactory
+ .createDoubleDoubleFunction(func.getSimpleName()));
+ }
+
+ public void setDouble(String name, double value) {
+ conf.setDouble(name, value);
+ }
+
+ public void setMaxIteration(int maxIteration) {
+ this.conf.setInt("training.max.iterations", maxIteration);
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.conf.setInt("training.batch.size", batchSize);
+ }
+
+ public void setLearningRate(double learningRate) {
+ this.conf.setDouble("mlp.learning.rate", learningRate);
+ }
+
+ public void setConvergenceCheckInterval(int n) {
+ this.conf.setInt("convergence.check.interval", n);
+ }
+
+ public void setMomentumWeight(double momentumWeight) {
+ this.conf.setDouble("mlp.momentum.weight", momentumWeight);
+ }
+
+ public LayeredNeuralNetwork getNeuralNetwork() {
+ return neuralNetwork;
+ }
+
+ public boolean waitForCompletion(boolean verbose) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ BSPJob job = neuralNetwork.train(this.conf);
+ if (verbose) {
+ return job.waitForCompletion(true);
+ } else {
+ return job.waitForCompletion(false);
+ }
+ }
+
+ public void setRegularizationWeight(double regularizationWeight) {
+ this.conf.setDouble("regularization.weight", regularizationWeight);
+ }
+
+ public void setModelPath(String modelPath) {
+ this.conf.set("model.path", modelPath);
+ neuralNetwork.setModelPath(modelPath);
+ }
+
+ public void setTrainingSetPath(String inputPath) {
+ this.conf.set("training.input.path", inputPath);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java b/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
new file mode 100644
index 0000000..afccbff
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/LayeredNeuralNetwork.java
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.commons.io.MatrixWritable;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleFunction;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.hama.util.ReflectionUtils;
+import org.apache.horn.examples.MultiLayerPerceptron.StandardNeuron;
+import org.apache.horn.funcs.FunctionFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * SmallLayeredNeuralNetwork defines the general operations for derivative
+ * layered models, include Linear Regression, Logistic Regression, Multilayer
+ * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc. For
+ * SmallLayeredNeuralNetwork, the training can be conducted in parallel, but the
+ * parameters of the models are assumes to be stored in a single machine.
+ *
+ * In general, these models consist of neurons which are aligned in layers.
+ * Between layers, for any two adjacent layers, the neurons are connected to
+ * form a bipartite weighted graph.
+ *
+ */
+public class LayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
+
+ private static final Log LOG = LogFactory
+ .getLog(LayeredNeuralNetwork.class);
+
+ public static Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>> neuronClass;
+
+ /* Weights between neurons at adjacent layers */
+ protected List<DoubleMatrix> weightMatrixList;
+
+ /* Previous weight updates between neurons at adjacent layers */
+ protected List<DoubleMatrix> prevWeightUpdatesList;
+
+ /* Different layers can have different squashing function */
+ protected List<DoubleFunction> squashingFunctionList;
+
+ protected int finalLayerIdx;
+
+ protected double regularizationWeight;
+
+ public LayeredNeuralNetwork() {
+ this.layerSizeList = Lists.newArrayList();
+ this.weightMatrixList = Lists.newArrayList();
+ this.prevWeightUpdatesList = Lists.newArrayList();
+ this.squashingFunctionList = Lists.newArrayList();
+ }
+
+ public LayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
+ super(conf, modelPath);
+ this.regularizationWeight = conf.getDouble("regularization.weight", 0);
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public int addLayer(int size, boolean isFinalLayer,
+ DoubleFunction squashingFunction) {
+ Preconditions.checkArgument(size > 0,
+ "Size of layer must be larger than 0.");
+ if (!isFinalLayer) {
+ size += 1;
+ }
+
+ LOG.info("Add Layer: " + size);
+ this.layerSizeList.add(size);
+ int layerIdx = this.layerSizeList.size() - 1;
+ if (isFinalLayer) {
+ this.finalLayerIdx = layerIdx;
+ }
+
+ // add weights between current layer and previous layer, and input layer has
+ // no squashing function
+ if (layerIdx > 0) {
+ int sizePrevLayer = this.layerSizeList.get(layerIdx - 1);
+ // row count equals to size of current size and column count equals to
+ // size of previous layer
+ int row = isFinalLayer ? size : size - 1;
+ int col = sizePrevLayer;
+ DoubleMatrix weightMatrix = new DenseDoubleMatrix(row, col);
+ // initialize weights
+ weightMatrix.applyToElements(new DoubleFunction() {
+ @Override
+ public double apply(double value) {
+ return RandomUtils.nextDouble() - 0.5;
+ }
+
+ @Override
+ public double applyDerivative(double value) {
+ throw new UnsupportedOperationException("");
+ }
+ });
+ this.weightMatrixList.add(weightMatrix);
+ this.prevWeightUpdatesList.add(new DenseDoubleMatrix(row, col));
+ this.squashingFunctionList.add(squashingFunction);
+ }
+ return layerIdx;
+ }
+
+ /**
+ * Update the weight matrices with given matrices.
+ *
+ * @param matrices
+ */
+ public void updateWeightMatrices(DoubleMatrix[] matrices) {
+ for (int i = 0; i < matrices.length; ++i) {
+ DoubleMatrix matrix = this.weightMatrixList.get(i);
+ this.weightMatrixList.set(i, matrix.add(matrices[i]));
+ }
+ }
+
+ /**
+ * Set the previous weight matrices.
+ *
+ * @param prevUpdates
+ */
+ void setPrevWeightMatrices(DoubleMatrix[] prevUpdates) {
+ this.prevWeightUpdatesList.clear();
+ Collections.addAll(this.prevWeightUpdatesList, prevUpdates);
+ }
+
+ /**
+ * Add a batch of matrices onto the given destination matrices.
+ *
+ * @param destMatrices
+ * @param sourceMatrices
+ */
+ static void matricesAdd(DoubleMatrix[] destMatrices,
+ DoubleMatrix[] sourceMatrices) {
+ for (int i = 0; i < destMatrices.length; ++i) {
+ destMatrices[i] = destMatrices[i].add(sourceMatrices[i]);
+ }
+ }
+
+ /**
+ * Get all the weight matrices.
+ *
+ * @return The matrices in form of matrix array.
+ */
+ DoubleMatrix[] getWeightMatrices() {
+ DoubleMatrix[] matrices = new DoubleMatrix[this.weightMatrixList.size()];
+ this.weightMatrixList.toArray(matrices);
+ return matrices;
+ }
+
+ /**
+ * Set the weight matrices.
+ *
+ * @param matrices
+ */
+ public void setWeightMatrices(DoubleMatrix[] matrices) {
+ this.weightMatrixList = new ArrayList<DoubleMatrix>();
+ Collections.addAll(this.weightMatrixList, matrices);
+ }
+
+ /**
+ * Get the previous matrices updates in form of array.
+ *
+ * @return The matrices in form of matrix array.
+ */
+ public DoubleMatrix[] getPrevMatricesUpdates() {
+ DoubleMatrix[] prevMatricesUpdates = new DoubleMatrix[this.prevWeightUpdatesList
+ .size()];
+ for (int i = 0; i < this.prevWeightUpdatesList.size(); ++i) {
+ prevMatricesUpdates[i] = this.prevWeightUpdatesList.get(i);
+ }
+ return prevMatricesUpdates;
+ }
+
+ public void setWeightMatrix(int index, DoubleMatrix matrix) {
+ Preconditions.checkArgument(
+ 0 <= index && index < this.weightMatrixList.size(), String.format(
+ "index [%d] should be in range[%d, %d].", index, 0,
+ this.weightMatrixList.size()));
+ this.weightMatrixList.set(index, matrix);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+
+ // read squash functions
+ int squashingFunctionSize = input.readInt();
+ this.squashingFunctionList = Lists.newArrayList();
+ for (int i = 0; i < squashingFunctionSize; ++i) {
+ this.squashingFunctionList.add(FunctionFactory
+ .createDoubleFunction(WritableUtils.readString(input)));
+ }
+
+ // read weights and construct matrices of previous updates
+ int numOfMatrices = input.readInt();
+ this.weightMatrixList = Lists.newArrayList();
+ this.prevWeightUpdatesList = Lists.newArrayList();
+ for (int i = 0; i < numOfMatrices; ++i) {
+ DoubleMatrix matrix = MatrixWritable.read(input);
+ this.weightMatrixList.add(matrix);
+ this.prevWeightUpdatesList.add(new DenseDoubleMatrix(
+ matrix.getRowCount(), matrix.getColumnCount()));
+ }
+
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ super.write(output);
+
+ // write squashing functions
+ output.writeInt(this.squashingFunctionList.size());
+ for (DoubleFunction aSquashingFunctionList : this.squashingFunctionList) {
+ WritableUtils.writeString(output,
+ aSquashingFunctionList.getFunctionName());
+ }
+
+ // write weight matrices
+ output.writeInt(this.weightMatrixList.size());
+ for (DoubleMatrix aWeightMatrixList : this.weightMatrixList) {
+ MatrixWritable.write(aWeightMatrixList, output);
+ }
+
+ // DO NOT WRITE WEIGHT UPDATE
+ }
+
+ @Override
+ public DoubleMatrix getWeightsByLayer(int layerIdx) {
+ return this.weightMatrixList.get(layerIdx);
+ }
+
+ /**
+ * Get the output of the model according to given feature instance.
+ */
+ @Override
+ public DoubleVector getOutput(DoubleVector instance) {
+ Preconditions.checkArgument(this.layerSizeList.get(0) - 1 == instance
+ .getDimension(), String.format(
+ "The dimension of input instance should be %d.",
+ this.layerSizeList.get(0) - 1));
+ // transform the features to another space
+ DoubleVector transformedInstance = this.featureTransformer
+ .transform(instance);
+ // add bias feature
+ DoubleVector instanceWithBias = new DenseDoubleVector(
+ transformedInstance.getDimension() + 1);
+ instanceWithBias.set(0, 0.99999); // set bias to be a little bit less than
+ // 1.0
+ for (int i = 1; i < instanceWithBias.getDimension(); ++i) {
+ instanceWithBias.set(i, transformedInstance.get(i - 1));
+ }
+
+ List<DoubleVector> outputCache = getOutputInternal(instanceWithBias);
+ // return the output of the last layer
+ DoubleVector result = outputCache.get(outputCache.size() - 1);
+ // remove bias
+ return result.sliceUnsafe(1, result.getDimension() - 1);
+ }
+
+ /**
+ * Calculate output internally, the intermediate output of each layer will be
+ * stored.
+ *
+ * @param instanceWithBias The instance contains the features.
+ * @return Cached output of each layer.
+ */
+ public List<DoubleVector> getOutputInternal(DoubleVector instanceWithBias) {
+ List<DoubleVector> outputCache = new ArrayList<DoubleVector>();
+ // fill with instance
+ DoubleVector intermediateOutput = instanceWithBias;
+ outputCache.add(intermediateOutput);
+
+ for (int i = 0; i < this.layerSizeList.size() - 1; ++i) {
+ intermediateOutput = forward(i, intermediateOutput);
+ outputCache.add(intermediateOutput);
+ }
+
+ return outputCache;
+ }
+
+ /**
+ * @return a new neuron instance
+ */
+ public static Neuron<Synapse<DoubleWritable, DoubleWritable>> newNeuronInstance() {
+ return (Neuron<Synapse<DoubleWritable, DoubleWritable>>) ReflectionUtils
+ .newInstance(neuronClass);
+ }
+
+ /**
+ * Forward the calculation for one layer.
+ *
+ * @param fromLayer The index of the previous layer.
+ * @param intermediateOutput The intermediateOutput of previous layer.
+ * @return a new vector with the result of the operation.
+ */
+ @SuppressWarnings("unchecked")
+ protected DoubleVector forward(int fromLayer, DoubleVector intermediateOutput) {
+ DoubleMatrix weightMatrix = this.weightMatrixList.get(fromLayer);
+
+ neuronClass = (Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>>) conf
+ .getClass("neuron.class", Neuron.class);
+
+ // TODO use the multithread processing
+ DoubleVector vec = new DenseDoubleVector(weightMatrix.getRowCount());
+ for (int row = 0; row < weightMatrix.getRowCount(); row++) {
+ List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
+ for (int col = 0; col < weightMatrix.getColumnCount(); col++) {
+ msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
+ new DoubleWritable(intermediateOutput.get(col)),
+ new DoubleWritable(weightMatrix.get(row, col))));
+ }
+ Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
+ Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
+ n.setup(conf);
+ n.setSquashingFunction(this.squashingFunctionList.get(fromLayer));
+ try {
+ n.forward(iterable);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ vec.set(row, n.getOutput());
+ }
+
+ // add bias
+ DoubleVector vecWithBias = new DenseDoubleVector(vec.getDimension() + 1);
+ vecWithBias.set(0, 1);
+ for (int i = 0; i < vec.getDimension(); ++i) {
+ vecWithBias.set(i + 1, vec.get(i));
+ }
+
+ return vecWithBias;
+ }
+
+ /**
+ * Train the model online.
+ *
+ * @param trainingInstance
+ */
+ public void trainOnline(DoubleVector trainingInstance) {
+ DoubleMatrix[] updateMatrices = this.trainByInstance(trainingInstance);
+ this.updateWeightMatrices(updateMatrices);
+ }
+
+ @Override
+ public DoubleMatrix[] trainByInstance(DoubleVector trainingInstance) {
+ DoubleVector transformedVector = this.featureTransformer
+ .transform(trainingInstance.sliceUnsafe(this.layerSizeList.get(0) - 1));
+
+ int inputDimension = this.layerSizeList.get(0) - 1;
+ int outputDimension;
+ DoubleVector inputInstance = null;
+ DoubleVector labels = null;
+ if (this.learningStyle == LearningStyle.SUPERVISED) {
+ outputDimension = this.layerSizeList.get(this.layerSizeList.size() - 1);
+ // validate training instance
+ Preconditions.checkArgument(
+ inputDimension + outputDimension == trainingInstance.getDimension(),
+ String
+ .format(
+ "The dimension of training instance is %d, but requires %d.",
+ trainingInstance.getDimension(), inputDimension
+ + outputDimension));
+
+ inputInstance = new DenseDoubleVector(this.layerSizeList.get(0));
+ inputInstance.set(0, 1); // add bias
+ // get the features from the transformed vector
+ for (int i = 0; i < inputDimension; ++i) {
+ inputInstance.set(i + 1, transformedVector.get(i));
+ }
+ // get the labels from the original training instance
+ labels = trainingInstance.sliceUnsafe(inputInstance.getDimension() - 1,
+ trainingInstance.getDimension() - 1);
+ } else if (this.learningStyle == LearningStyle.UNSUPERVISED) {
+ // labels are identical to input features
+ outputDimension = inputDimension;
+ // validate training instance
+ Preconditions.checkArgument(inputDimension == trainingInstance
+ .getDimension(), String.format(
+ "The dimension of training instance is %d, but requires %d.",
+ trainingInstance.getDimension(), inputDimension));
+
+ inputInstance = new DenseDoubleVector(this.layerSizeList.get(0));
+ inputInstance.set(0, 1); // add bias
+ // get the features from the transformed vector
+ for (int i = 0; i < inputDimension; ++i) {
+ inputInstance.set(i + 1, transformedVector.get(i));
+ }
+ // get the labels by copying the transformed vector
+ labels = transformedVector.deepCopy();
+ }
+
+ List<DoubleVector> internalResults = this.getOutputInternal(inputInstance);
+ DoubleVector output = internalResults.get(internalResults.size() - 1);
+
+ // get the training error
+ calculateTrainingError(labels,
+ output.deepCopy().sliceUnsafe(1, output.getDimension() - 1));
+
+ if (this.trainingMethod.equals(TrainingMethod.GRADIENT_DESCENT)) {
+ return this.trainByInstanceGradientDescent(labels, internalResults);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Training method is not supported."));
+ }
+ }
+
+ /**
+ * Train by gradient descent. Get the updated weights using one training
+ * instance.
+ *
+ * @param trainingInstance
+ * @return The weight update matrices.
+ */
+ private DoubleMatrix[] trainByInstanceGradientDescent(DoubleVector labels,
+ List<DoubleVector> internalResults) {
+
+ DoubleVector output = internalResults.get(internalResults.size() - 1);
+ // initialize weight update matrices
+ DenseDoubleMatrix[] weightUpdateMatrices = new DenseDoubleMatrix[this.weightMatrixList
+ .size()];
+ for (int m = 0; m < weightUpdateMatrices.length; ++m) {
+ weightUpdateMatrices[m] = new DenseDoubleMatrix(this.weightMatrixList
+ .get(m).getRowCount(), this.weightMatrixList.get(m).getColumnCount());
+ }
+ DoubleVector deltaVec = new DenseDoubleVector(
+ this.layerSizeList.get(this.layerSizeList.size() - 1));
+
+ DoubleFunction squashingFunction = this.squashingFunctionList
+ .get(this.squashingFunctionList.size() - 1);
+
+ DoubleMatrix lastWeightMatrix = this.weightMatrixList
+ .get(this.weightMatrixList.size() - 1);
+ for (int i = 0; i < deltaVec.getDimension(); ++i) {
+ double costFuncDerivative = this.costFunction.applyDerivative(
+ labels.get(i), output.get(i + 1));
+ // add regularization
+ costFuncDerivative += this.regularizationWeight
+ * lastWeightMatrix.getRowVector(i).sum();
+ deltaVec.set(
+ i,
+ costFuncDerivative
+ * squashingFunction.applyDerivative(output.get(i + 1)));
+ }
+
+ // start from previous layer of output layer
+ for (int layer = this.layerSizeList.size() - 2; layer >= 0; --layer) {
+ output = internalResults.get(layer);
+ deltaVec = backpropagate(layer, deltaVec, internalResults,
+ weightUpdateMatrices[layer]);
+ }
+
+ this.setPrevWeightMatrices(weightUpdateMatrices);
+
+ return weightUpdateMatrices;
+ }
+
+ /**
+ * Back-propagate the errors to from next layer to current layer. The weight
+ * updated information will be stored in the weightUpdateMatrices, and the
+ * delta of the prevLayer would be returned.
+ *
+ * @param layer Index of current layer.
+ * @param internalOutput Internal output of current layer.
+ * @param deltaVec Delta of next layer.
+ * @return the squashing function of the specified position.
+ */
+ private DoubleVector backpropagate(int curLayerIdx,
+ DoubleVector nextLayerDelta, List<DoubleVector> outputCache,
+ DenseDoubleMatrix weightUpdateMatrix) {
+
+ // get layer related information
+ DoubleVector curLayerOutput = outputCache.get(curLayerIdx);
+ DoubleMatrix weightMatrix = this.weightMatrixList.get(curLayerIdx);
+ DoubleMatrix prevWeightMatrix = this.prevWeightUpdatesList.get(curLayerIdx);
+
+ // next layer is not output layer, remove the delta of bias neuron
+ if (curLayerIdx != this.layerSizeList.size() - 2) {
+ nextLayerDelta = nextLayerDelta.slice(1,
+ nextLayerDelta.getDimension() - 1);
+ }
+
+ // DoubleMatrix transposed = weightMatrix.transpose();
+ DoubleVector deltaVector = new DenseDoubleVector(
+ weightMatrix.getColumnCount());
+ for (int row = 0; row < weightMatrix.getColumnCount(); ++row) {
+ Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
+ // calls setup method
+ n.setup(conf);
+ n.setSquashingFunction(this.squashingFunctionList.get(curLayerIdx));
+ n.setOutput(curLayerOutput.get(row));
+
+ List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
+
+ n.setWeightVector(weightMatrix.getRowCount());
+
+ for (int col = 0; col < weightMatrix.getRowCount(); ++col) {
+ // sum += (transposed.get(row, col) * nextLayerDelta.get(col));
+ msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
+ new DoubleWritable(nextLayerDelta.get(col)), new DoubleWritable(
+ weightMatrix.get(col, row)), new DoubleWritable(
+ prevWeightMatrix.get(col, row))));
+ }
+
+ Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
+ try {
+ n.backward(iterable);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ // update weights
+ weightUpdateMatrix.setColumn(row, n.getWeights());
+ deltaVector.set(row, n.getDelta());
+ }
+
+ return deltaVector;
+ }
+
+ @Override
+ protected BSPJob trainInternal(HamaConfiguration hamaConf)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ this.conf = hamaConf;
+ this.fs = FileSystem.get(conf);
+
+ String modelPath = conf.get("model.path");
+ if (modelPath != null) {
+ this.modelPath = modelPath;
+ }
+ // modelPath must be set before training
+ if (this.modelPath == null) {
+ throw new IllegalArgumentException(
+ "Please specify the modelPath for model, "
+ + "either through setModelPath() or add 'modelPath' to the training parameters.");
+ }
+ this.writeModelToFile();
+
+ // create job
+ BSPJob job = new BSPJob(conf, LayeredNeuralNetworkTrainer.class);
+ job.setJobName("Small scale Neural Network training");
+ job.setJarByClass(LayeredNeuralNetworkTrainer.class);
+ job.setBspClass(LayeredNeuralNetworkTrainer.class);
+
+ job.getConfiguration().setClass("neuron.class", StandardNeuron.class,
+ Neuron.class);
+
+ // additional for parameter server
+ // TODO at this moment, we use 1 task as a parameter server
+ // In the future, the number of parameter server should be configurable
+ job.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
+
+ job.setInputPath(new Path(conf.get("training.input.path")));
+ job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
+ job.setInputKeyClass(LongWritable.class);
+ job.setInputValueClass(VectorWritable.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+ job.setOutputFormat(org.apache.hama.bsp.NullOutputFormat.class);
+
+ return job;
+ }
+
+ @Override
+ protected void calculateTrainingError(DoubleVector labels, DoubleVector output) {
+ DoubleVector errors = labels.deepCopy().applyToElements(output,
+ this.costFunction);
+ this.trainingError = errors.sum();
+ }
+
+ /**
+ * Get the squashing function of a specified layer.
+ *
+ * @param idx
+ * @return a new vector with the result of the operation.
+ */
+ public DoubleFunction getSquashingFunction(int idx) {
+ return this.squashingFunctionList.get(idx);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
new file mode 100644
index 0000000..effd5b0
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/LayeredNeuralNetworkTrainer.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.hama.ipc.RPC;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The trainer that train the {@link LayeredNeuralNetwork} based on BSP
+ * framework.
+ *
+ */
+public final class LayeredNeuralNetworkTrainer
+ extends
+ BSP<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> {
+
+ private static final Log LOG = LogFactory
+ .getLog(LayeredNeuralNetworkTrainer.class);
+
+ /* When given peer is master worker: base of parameter merge */
+ /* When given peer is slave worker: neural network for training */
+ private LayeredNeuralNetwork inMemoryModel;
+
+ /* Job configuration */
+ private HamaConfiguration conf;
+
+ /* Default batch size */
+ private int batchSize;
+
+ /* whether it is converging or not */
+ private AtomicBoolean isConverge;
+
+ /* When given peer is master worker: Asynchronous parameter merger */
+ /* When given peer is slave worker: null */
+ private RPC.Server merger;
+
+ /* When given peer is master worker: null */
+ /* When given peer is slave worker: proxy to Asynchronous parameter merger */
+ private ParameterMerger proxy;
+
+ /**
+ * Returns true if this worker is master worker.
+ *
+ * @param peer
+ * */
+ private boolean isMaster(
+ BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
+ return peer.getPeerIndex() == peer.getNumPeers() - 1;
+ }
+
+ @Override
+ /**
+ * If the model path is specified, load the existing from storage location.
+ */
+ public void setup(
+ BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
+ // At least one master & slave worker exist.
+ Preconditions.checkArgument(peer.getNumPeers() >= 2);
+ this.conf = peer.getConfiguration();
+
+ String modelPath = conf.get("model.path");
+ this.inMemoryModel = new LayeredNeuralNetwork(conf, modelPath);
+
+ this.batchSize = conf.getInt("training.batch.size", 50);
+ this.isConverge = new AtomicBoolean(false);
+
+ int slaveCount = peer.getNumPeers() - 1;
+ int mergeLimit = conf.getInt("training.max.iterations", 100000);
+ int convergenceCheckInterval = peer.getNumPeers()
+ * conf.getInt("convergence.check.interval", 2000);
+ String master = peer.getPeerName();
+ String masterAddr = master.substring(0, master.indexOf(':'));
+ int port = conf.getInt("sync.server.port", 40052);
+
+ if (isMaster(peer)) {
+ try {
+ this.merger = RPC.getServer(new ParameterMergerServer(inMemoryModel,
+ isConverge, slaveCount, mergeLimit, convergenceCheckInterval),
+ masterAddr, port, conf);
+ merger.start();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ LOG.info("Begin to train");
+ } else {
+ InetSocketAddress addr = new InetSocketAddress(masterAddr, port);
+ try {
+ this.proxy = (ParameterMerger) RPC.getProxy(ParameterMerger.class,
+ ParameterMerger.versionID, addr, conf);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ /**
+ * Write the trained model back to stored location.
+ */
+ public void cleanup(
+ BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer) {
+ // write model to modelPath
+ if (isMaster(peer)) {
+ try {
+ LOG.info("Write model back to " + inMemoryModel.getModelPath());
+ this.inMemoryModel.writeModelToFile();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public void bsp(
+ BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
+ throws IOException, SyncException, InterruptedException {
+ while (!this.isConverge.get()) {
+ // each slave-worker calculate the matrices updates according to local
+ // data
+ // and merge them with master
+ if (!isMaster(peer)) {
+ calculateUpdates(peer);
+ }
+ }
+
+ if (isMaster(peer)) {
+ merger.stop();
+ }
+ peer.sync(); // finalize the bsp program.
+ }
+
+ /**
+ * Calculate the matrices updates according to local partition of data.
+ *
+ * @param peer
+ * @throws IOException
+ */
+ private void calculateUpdates(
+ BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, ParameterMessage> peer)
+ throws IOException {
+
+ DoubleMatrix[] weightUpdates = new DoubleMatrix[this.inMemoryModel.weightMatrixList
+ .size()];
+ for (int i = 0; i < weightUpdates.length; ++i) {
+ int row = this.inMemoryModel.weightMatrixList.get(i).getRowCount();
+ int col = this.inMemoryModel.weightMatrixList.get(i).getColumnCount();
+ weightUpdates[i] = new DenseDoubleMatrix(row, col);
+ }
+
+ // continue to train
+ double avgTrainingError = 0.0;
+ LongWritable key = new LongWritable();
+ VectorWritable value = new VectorWritable();
+ for (int recordsRead = 0; recordsRead < batchSize; ++recordsRead) {
+ if (!peer.readNext(key, value)) {
+ peer.reopenInput();
+ peer.readNext(key, value);
+ }
+ DoubleVector trainingInstance = value.getVector();
+ LayeredNeuralNetwork.matricesAdd(weightUpdates,
+ this.inMemoryModel.trainByInstance(trainingInstance));
+ avgTrainingError += this.inMemoryModel.trainingError;
+ }
+ avgTrainingError /= batchSize;
+
+ // calculate the average of updates
+ for (int i = 0; i < weightUpdates.length; ++i) {
+ weightUpdates[i] = weightUpdates[i].divide(batchSize);
+ }
+
+ // exchange parameter update with master
+ ParameterMessage msg = new ParameterMessage(
+ avgTrainingError, false, weightUpdates,
+ this.inMemoryModel.getPrevMatricesUpdates());
+
+ ParameterMessage inMessage = proxy.merge(msg);
+ DoubleMatrix[] newWeights = inMessage.getCurMatrices();
+ DoubleMatrix[] preWeightUpdates = inMessage.getPrevMatrices();
+ this.inMemoryModel.setWeightMatrices(newWeights);
+ this.inMemoryModel.setPrevWeightMatrices(preWeightUpdates);
+ this.isConverge.set(inMessage.isConverge());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/Neuron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/Neuron.java b/src/main/java/org/apache/horn/core/Neuron.java
new file mode 100644
index 0000000..357b42f
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/Neuron.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.commons.math.DoubleFunction;
+
+public abstract class Neuron<M extends Writable> implements NeuronInterface<M> {
+ double output;
+ double weight;
+ double delta;
+ protected DoubleFunction squashingFunction;
+
+ public void feedforward(double sum) {
+ // TODO Auto-generated method stub
+ // squashing
+ this.output = sum;
+ }
+
+ public void backpropagate(double gradient) {
+ // TODO Auto-generated method stub
+ this.delta = gradient;
+ }
+
+ public double getDelta() {
+ return delta;
+ }
+
+ public void setWeight(double weight) {
+ this.weight = weight;
+ }
+
+ public void setOutput(double output) {
+ this.output = output;
+ }
+
+ public double getOutput() {
+ return output;
+ }
+
+ // ////////* Below methods will communicate with parameter server */
+ private int i;
+
+ public void push(double weight) {
+ weights[i++] = weight;
+ }
+
+ public double getUpdate() {
+ return weight;
+ }
+
+ double[] weights;
+
+ public void setWeightVector(int rowCount) {
+ i = 0;
+ weights = new double[rowCount];
+ }
+
+ public double[] getWeights() {
+ return weights;
+ }
+
+ public void setSquashingFunction(DoubleFunction squashingFunction) {
+ this.squashingFunction = squashingFunction;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/NeuronInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/NeuronInterface.java b/src/main/java/org/apache/horn/core/NeuronInterface.java
new file mode 100644
index 0000000..5e4c113
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/NeuronInterface.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+
+public interface NeuronInterface<M extends Writable> {
+
+ public void setup(HamaConfiguration conf);
+
+ /**
+ * This method is called when the messages are propagated from the lower
+ * layer. It can be used to determine if the neuron would activate, or fire.
+ *
+ * @param messages
+ * @throws IOException
+ */
+ public void forward(Iterable<M> messages) throws IOException;
+
+ /**
+ * This method is called when the errors are propagated from the upper layer.
+ * It can be used to calculate the error of each neuron and change the
+ * weights.
+ *
+ * @param messages
+ * @throws IOException
+ */
+ public void backward(Iterable<M> messages) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/ParameterMerger.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/ParameterMerger.java b/src/main/java/org/apache/horn/core/ParameterMerger.java
new file mode 100644
index 0000000..512b402
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/ParameterMerger.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import org.apache.hama.ipc.VersionedProtocol;
+
+public interface ParameterMerger extends VersionedProtocol {
+ long versionID = 1L;
+
+ ParameterMessage merge(ParameterMessage msg);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/ParameterMergerServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/ParameterMergerServer.java b/src/main/java/org/apache/horn/core/ParameterMergerServer.java
new file mode 100644
index 0000000..c76a4d0
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/ParameterMergerServer.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.commons.math.DoubleMatrix;
+
+import com.google.common.base.Preconditions;
+
+public class ParameterMergerServer implements ParameterMerger {
+
+ private static final Log LOG = LogFactory.getLog(ParameterMergerServer.class);
+
+ /* The parameter merge base. */
+ protected LayeredNeuralNetwork inMemoryModel;
+
+ /* To terminate or not to terminate. */
+ protected AtomicBoolean isConverge;
+
+ /* The number of slave works that request commits. */
+ protected int SlaveCount;
+
+ /* After mergeLimit, terminate whether the result is converging or not. */
+ protected int mergeLimit;
+
+ /*
+ * last n training errors. converging is decided based on the average value of
+ * these errors.
+ */
+ protected double[] trainingErrors;
+
+ /*
+ * If the average of last n training errors is smaller than this value, it is
+ * converging.
+ */
+ protected double prevAvgTrainingError = Double.MAX_VALUE;
+
+ /* current index for trainingErrors. */
+ protected int curTrainingError = 0;
+
+ /* how many merges have been conducted? */
+ protected int mergeCount = 0;
+
+ public ParameterMergerServer(LayeredNeuralNetwork inMemoryModel,
+ AtomicBoolean isConverge, int slaveCount, int mergeLimit,
+ int convergenceCheckInterval) {
+ this.inMemoryModel = inMemoryModel;
+ this.isConverge = isConverge;
+ this.SlaveCount = slaveCount;
+ this.mergeLimit = mergeLimit;
+ this.trainingErrors = new double[convergenceCheckInterval];
+ }
+
+ @Override
+ public long getProtocolVersion(String s, long l) throws IOException {
+ return ParameterMerger.versionID;
+ }
+
+ @Override
+ public ParameterMessage merge(
+ ParameterMessage msg) {
+
+ double trainingError = msg.getTrainingError();
+ DoubleMatrix[] weightUpdates = msg.getCurMatrices();
+ DoubleMatrix[] prevWeightUpdates = msg.getPrevMatrices();
+
+ Preconditions
+ .checkArgument(weightUpdates.length == prevWeightUpdates.length);
+
+ LOG.info("Start merging: " + this.mergeCount);
+
+ if (!this.isConverge.get()) {
+ for (int i = 0; i < weightUpdates.length; ++i) {
+ weightUpdates[i] = weightUpdates[i].divide(this.SlaveCount);
+ prevWeightUpdates[i] = prevWeightUpdates[i].divide(this.SlaveCount);
+ }
+
+ synchronized (inMemoryModel) {
+ this.inMemoryModel.updateWeightMatrices(weightUpdates);
+ this.inMemoryModel.setPrevWeightMatrices(prevWeightUpdates);
+
+ // add trainingError to trainingErrors
+ this.trainingErrors[this.curTrainingError++] = trainingError;
+
+ // check convergence
+ if (this.trainingErrors.length == this.curTrainingError) {
+ double curAvgTrainingError = 0.0;
+ for (int i = 0; i < this.curTrainingError; ++i) {
+ curAvgTrainingError += this.trainingErrors[i];
+ }
+ curAvgTrainingError /= this.trainingErrors.length;
+
+ if (prevAvgTrainingError < curAvgTrainingError) {
+ this.isConverge.set(true);
+ } else {
+ // update
+ prevAvgTrainingError = curAvgTrainingError;
+ this.curTrainingError = 0;
+ }
+ }
+
+ if (++this.mergeCount == this.mergeLimit) {
+ this.isConverge.set(true);
+ }
+ }
+ }
+
+ return new ParameterMessage(0, this.isConverge.get(),
+ this.inMemoryModel.getWeightMatrices(),
+ this.inMemoryModel.getPrevMatricesUpdates());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/ParameterMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/ParameterMessage.java b/src/main/java/org/apache/horn/core/ParameterMessage.java
new file mode 100644
index 0000000..3905e25
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/ParameterMessage.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.commons.io.MatrixWritable;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DoubleMatrix;
+
+/**
+ * ParameterMessage transmits the messages between workers and parameter
+ * servers during the training of neural networks.
+ *
+ */
+public class ParameterMessage implements Writable {
+
+ protected double trainingError;
+ protected DoubleMatrix[] curMatrices;
+ protected DoubleMatrix[] prevMatrices;
+ protected boolean converge;
+
+ public ParameterMessage() {
+ }
+
+ public ParameterMessage(double trainingError, boolean converge,
+ DoubleMatrix[] weightMatrices, DoubleMatrix[] prevMatrices) {
+ this.trainingError = trainingError;
+ this.converge = converge;
+ this.curMatrices = weightMatrices;
+ this.prevMatrices = prevMatrices;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ trainingError = input.readDouble();
+ converge = input.readBoolean();
+ int numMatrices = input.readInt();
+ boolean hasPrevMatrices = input.readBoolean();
+ curMatrices = new DenseDoubleMatrix[numMatrices];
+ // read matrice updates
+ for (int i = 0; i < curMatrices.length; ++i) {
+ curMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
+ }
+
+ if (hasPrevMatrices) {
+ prevMatrices = new DenseDoubleMatrix[numMatrices];
+ // read previous matrices updates
+ for (int i = 0; i < prevMatrices.length; ++i) {
+ prevMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeDouble(trainingError);
+ output.writeBoolean(converge);
+ output.writeInt(curMatrices.length);
+ if (prevMatrices == null) {
+ output.writeBoolean(false);
+ } else {
+ output.writeBoolean(true);
+ }
+ for (DoubleMatrix matrix : curMatrices) {
+ MatrixWritable.write(matrix, output);
+ }
+ if (prevMatrices != null) {
+ for (DoubleMatrix matrix : prevMatrices) {
+ MatrixWritable.write(matrix, output);
+ }
+ }
+ }
+
+ public double getTrainingError() {
+ return trainingError;
+ }
+
+ public void setTrainingError(double trainingError) {
+ this.trainingError = trainingError;
+ }
+
+ public boolean isConverge() {
+ return converge;
+ }
+
+ public void setConverge(boolean converge) {
+ this.converge = converge;
+ }
+
+ public DoubleMatrix[] getCurMatrices() {
+ return curMatrices;
+ }
+
+ public void setMatrices(DoubleMatrix[] curMatrices) {
+ this.curMatrices = curMatrices;
+ }
+
+ public DoubleMatrix[] getPrevMatrices() {
+ return prevMatrices;
+ }
+
+ public void setPrevMatrices(DoubleMatrix[] prevMatrices) {
+ this.prevMatrices = prevMatrices;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/core/Synapse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/core/Synapse.java b/src/main/java/org/apache/horn/core/Synapse.java
new file mode 100644
index 0000000..714767b
--- /dev/null
+++ b/src/main/java/org/apache/horn/core/Synapse.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Message wrapper for a propagating message
+ */
+public class Synapse<M extends Writable, W extends Writable> implements
+ Writable {
+
+ DoubleWritable message;
+ DoubleWritable weight;
+ DoubleWritable prevWeight;
+
+ public Synapse(DoubleWritable message, DoubleWritable weight) {
+ this.message = message;
+ this.weight = weight;
+ }
+
+ public Synapse(DoubleWritable message, DoubleWritable weight, DoubleWritable prevWeight) {
+ this.message = message;
+ this.weight = weight;
+ this.prevWeight = prevWeight;
+ }
+
+ /**
+ * @return the activation or error message
+ */
+ public double getMessage() {
+ return message.get();
+ }
+
+ public double getInput() {
+ // returns the input
+ return message.get();
+ }
+
+ public double getDelta() {
+ // returns the delta
+ return message.get();
+ }
+
+ public double getWeight() {
+ return weight.get();
+ }
+
+ public double getPrevWeight() {
+ return prevWeight.get();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ message.readFields(in);
+ weight.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ message.write(out);
+ weight.write(out);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
index f66344c..c3bf180 100644
--- a/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
+++ b/src/main/java/org/apache/horn/examples/MultiLayerPerceptron.java
@@ -21,9 +21,9 @@ import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hama.HamaConfiguration;
-import org.apache.horn.bsp.HornJob;
-import org.apache.horn.bsp.Neuron;
-import org.apache.horn.bsp.Synapse;
+import org.apache.horn.core.HornJob;
+import org.apache.horn.core.Neuron;
+import org.apache.horn.core.Synapse;
import org.apache.horn.funcs.CrossEntropy;
import org.apache.horn.funcs.Sigmoid;
@@ -101,7 +101,7 @@ public class MultiLayerPerceptron {
InterruptedException, ClassNotFoundException {
if (args.length < 9) {
System.out
- .println("Usage: model_path training_set learning_rate momentum regularization_weight feature_dimension label_dimension max_iteration num_tasks");
+ .println("Usage: <MODEL_PATH> <INPUT_PATH> <LEARNING_RATE> <MOMEMTUM_WEIGHT> <REGULARIZATION_WEIGHT> <FEATURE_DIMENSION> <LABEL_DIMENSION> <MAX_ITERATION> <NUM_TASKS>");
System.exit(1);
}
HornJob ann = createJob(new HamaConfiguration(), args[0], args[1],
[4/4] incubator-horn git commit: Code refactoring
Posted by ed...@apache.org.
Code refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-horn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-horn/commit/ac8eaf8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-horn/tree/ac8eaf8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-horn/diff/ac8eaf8e
Branch: refs/heads/master
Commit: ac8eaf8e179b147777e7f1b9312045a42df86373
Parents: 277773c
Author: Edward J. Yoon <ed...@apache.org>
Authored: Tue Apr 26 12:30:53 2016 +0900
Committer: Edward J. Yoon <ed...@apache.org>
Committed: Tue Apr 26 12:42:47 2016 +0900
----------------------------------------------------------------------
.../horn/bsp/AbstractLayeredNeuralNetwork.java | 221 -------
.../java/org/apache/horn/bsp/AutoEncoder.java | 197 ------
src/main/java/org/apache/horn/bsp/HornJob.java | 109 ----
.../java/org/apache/horn/bsp/NeuralNetwork.java | 237 -------
.../apache/horn/bsp/NeuralNetworkTrainer.java | 106 ---
src/main/java/org/apache/horn/bsp/Neuron.java | 82 ---
.../org/apache/horn/bsp/NeuronInterface.java | 48 --
.../org/apache/horn/bsp/ParameterMerger.java | 27 -
.../apache/horn/bsp/ParameterMergerServer.java | 132 ----
.../horn/bsp/SmallLayeredNeuralNetwork.java | 620 ------------------
.../bsp/SmallLayeredNeuralNetworkMessage.java | 126 ----
.../bsp/SmallLayeredNeuralNetworkTrainer.java | 216 -------
src/main/java/org/apache/horn/bsp/Synapse.java | 85 ---
.../horn/core/AbstractLayeredNeuralNetwork.java | 221 +++++++
.../apache/horn/core/AbstractNeuralNetwork.java | 237 +++++++
.../horn/core/AbstractNeuralNetworkTrainer.java | 108 ++++
.../java/org/apache/horn/core/AutoEncoder.java | 197 ++++++
src/main/java/org/apache/horn/core/HornJob.java | 109 ++++
.../apache/horn/core/LayeredNeuralNetwork.java | 621 ++++++++++++++++++
.../horn/core/LayeredNeuralNetworkTrainer.java | 216 +++++++
src/main/java/org/apache/horn/core/Neuron.java | 82 +++
.../org/apache/horn/core/NeuronInterface.java | 48 ++
.../org/apache/horn/core/ParameterMerger.java | 27 +
.../apache/horn/core/ParameterMergerServer.java | 132 ++++
.../org/apache/horn/core/ParameterMessage.java | 125 ++++
src/main/java/org/apache/horn/core/Synapse.java | 85 +++
.../horn/examples/MultiLayerPerceptron.java | 8 +-
.../org/apache/horn/examples/NeuralNetwork.java | 217 -------
.../java/org/apache/horn/bsp/MLTestBase.java | 64 --
.../org/apache/horn/bsp/TestAutoEncoder.java | 202 ------
.../horn/bsp/TestSmallLayeredNeuralNetwork.java | 642 -------------------
.../TestSmallLayeredNeuralNetworkMessage.java | 172 -----
.../java/org/apache/horn/core/MLTestBase.java | 64 ++
.../org/apache/horn/core/TestAutoEncoder.java | 203 ++++++
.../java/org/apache/horn/core/TestNeuron.java | 93 +++
.../core/TestSmallLayeredNeuralNetwork.java | 642 +++++++++++++++++++
.../TestSmallLayeredNeuralNetworkMessage.java | 173 +++++
.../horn/examples/MultiLayerPerceptronTest.java | 177 +++++
.../apache/horn/examples/NeuralNetworkTest.java | 212 ------
.../org/apache/horn/trainer/TestNeuron.java | 93 ---
40 files changed, 3564 insertions(+), 3812 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
deleted file mode 100644
index b18eb44..0000000
--- a/src/main/java/org/apache/horn/bsp/AbstractLayeredNeuralNetwork.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.commons.math.DoubleDoubleFunction;
-import org.apache.hama.commons.math.DoubleFunction;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.horn.funcs.FunctionFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * AbstractLayeredNeuralNetwork defines the general operations for derivative
- * layered models, include Linear Regression, Logistic Regression, Multilayer
- * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc.
- *
- * In general, these models consist of neurons which are aligned in layers.
- * Between layers, for any two adjacent layers, the neurons are connected to
- * form a bipartite weighted graph.
- *
- */
-abstract class AbstractLayeredNeuralNetwork extends NeuralNetwork {
-
- private static final double DEFAULT_MOMENTUM_WEIGHT = 0.1;
-
- double trainingError;
-
- /* The momentumWeight */
- protected double momentumWeight;
-
- /* The cost function of the model */
- protected DoubleDoubleFunction costFunction;
-
- /* Record the size of each layer */
- protected List<Integer> layerSizeList;
-
- protected TrainingMethod trainingMethod;
-
- protected LearningStyle learningStyle;
-
- public static enum TrainingMethod {
- GRADIENT_DESCENT
- }
-
- public static enum LearningStyle {
- UNSUPERVISED,
- SUPERVISED
- }
-
- public AbstractLayeredNeuralNetwork() {
- this.momentumWeight = DEFAULT_MOMENTUM_WEIGHT;
- this.trainingMethod = TrainingMethod.GRADIENT_DESCENT;
- this.learningStyle = LearningStyle.SUPERVISED;
- }
-
- public AbstractLayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
- super(conf, modelPath);
- }
-
- public void setTrainingMethod(TrainingMethod method) {
- this.trainingMethod = method;
- }
-
- public TrainingMethod getTrainingMethod() {
- return this.trainingMethod;
- }
-
- public void setLearningStyle(LearningStyle style) {
- this.learningStyle = style;
- }
-
- public LearningStyle getLearningStyle() {
- return this.learningStyle;
- }
-
- /**
- * Set the cost function for the model.
- *
- * @param costFunction
- */
- public void setCostFunction(DoubleDoubleFunction costFunction) {
- this.costFunction = costFunction;
- }
-
- /**
- * Add a layer of neurons with specified size. If the added layer is not the
- * first layer, it will automatically connects the neurons between with the
- * previous layer.
- *
- * @param size
- * @param isFinalLayer If false, add a bias neuron.
- * @param squashingFunction The squashing function for this layer, input layer
- * is f(x) = x by default.
- * @return The layer index, starts with 0.
- */
- public abstract int addLayer(int size, boolean isFinalLayer,
- DoubleFunction squashingFunction);
-
- /**
- * Get the size of a particular layer.
- *
- * @param layer
- * @return The layer size.
- */
- public int getLayerSize(int layer) {
- Preconditions.checkArgument(
- layer >= 0 && layer < this.layerSizeList.size(),
- String.format("Input must be in range [0, %d]\n",
- this.layerSizeList.size() - 1));
- return this.layerSizeList.get(layer);
- }
-
- /**
- * Get the layer size list.
- *
- * @return The layer size list.
- */
- protected List<Integer> getLayerSizeList() {
- return this.layerSizeList;
- }
-
- /**
- * Get the weights between layer layerIdx and layerIdx + 1
- *
- * @param layerIdx The index of the layer
- * @return The weights in form of {@link DoubleMatrix}
- */
- public abstract DoubleMatrix getWeightsByLayer(int layerIdx);
-
- /**
- * Get the updated weights using one training instance.
- *
- * @param trainingInstance The trainingInstance is the concatenation of
- * feature vector and class label vector.
- * @return The update of each weight, in form of matrix list.
- * @throws Exception
- */
- public abstract DoubleMatrix[] trainByInstance(DoubleVector trainingInstance);
-
- /**
- * Get the output calculated by the model.
- *
- * @param instance The feature instance.
- * @return a new vector with the result of the operation.
- */
- public abstract DoubleVector getOutput(DoubleVector instance);
-
- /**
- * Calculate the training error based on the labels and outputs.
- *
- * @param labels
- * @param output
- */
- protected abstract void calculateTrainingError(DoubleVector labels,
- DoubleVector output);
-
- @Override
- public void readFields(DataInput input) throws IOException {
- super.readFields(input);
- // read momentum weight
- this.momentumWeight = input.readDouble();
-
- // read cost function
- this.costFunction = FunctionFactory
- .createDoubleDoubleFunction(WritableUtils.readString(input));
-
- // read layer size list
- int numLayers = input.readInt();
- this.layerSizeList = Lists.newArrayList();
- for (int i = 0; i < numLayers; ++i) {
- this.layerSizeList.add(input.readInt());
- }
-
- this.trainingMethod = WritableUtils.readEnum(input, TrainingMethod.class);
- this.learningStyle = WritableUtils.readEnum(input, LearningStyle.class);
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- super.write(output);
- // write momentum weight
- output.writeDouble(this.momentumWeight);
-
- // write cost function
- WritableUtils.writeString(output, costFunction.getFunctionName());
-
- // write layer size list
- output.writeInt(this.layerSizeList.size());
- for (Integer aLayerSizeList : this.layerSizeList) {
- output.writeInt(aLayerSizeList);
- }
-
- WritableUtils.writeEnum(output, this.trainingMethod);
- WritableUtils.writeEnum(output, this.learningStyle);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/AutoEncoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/AutoEncoder.java b/src/main/java/org/apache/horn/bsp/AutoEncoder.java
deleted file mode 100644
index 8ea2930..0000000
--- a/src/main/java/org/apache/horn/bsp/AutoEncoder.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleFunction;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.ml.util.FeatureTransformer;
-import org.apache.horn.funcs.FunctionFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- * AutoEncoder is a model used for dimensional reduction and feature learning.
- * It is a special kind of {@link NeuralNetwork} that consists of three layers
- * of neurons, where the first layer and third layer contains the same number of
- * neurons.
- *
- */
-public class AutoEncoder {
-
- private final SmallLayeredNeuralNetwork model;
-
- /**
- * Initialize the autoencoder.
- *
- * @param inputDimensions The number of dimensions for the input feature.
- * @param compressedDimensions The number of dimensions for the compressed
- * information.
- */
- public AutoEncoder(int inputDimensions, int compressedDimensions) {
- model = new SmallLayeredNeuralNetwork();
- model.addLayer(inputDimensions, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- model.addLayer(compressedDimensions, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- model.addLayer(inputDimensions, true,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- model
- .setLearningStyle(AbstractLayeredNeuralNetwork.LearningStyle.UNSUPERVISED);
- model.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("SquaredError"));
- }
-
- public AutoEncoder(HamaConfiguration conf, String modelPath) {
- model = new SmallLayeredNeuralNetwork(conf, modelPath);
- }
-
- public AutoEncoder setModelPath(String modelPath) {
- model.setModelPath(modelPath);
- return this;
- }
-
- /**
- * Train the autoencoder with given data. Note that the training data is
- * pre-processed, where the features
- *
- * @param dataInputPath
- * @param trainingParams
- * @throws InterruptedException
- * @throws IOException
- * @throws ClassNotFoundException
- */
- public BSPJob train(HamaConfiguration conf, Path dataInputPath,
- Map<String, String> trainingParams) throws ClassNotFoundException, IOException, InterruptedException {
- return model.train(conf);
- }
-
- /**
- * Train the model with one instance.
- *
- * @param trainingInstance
- */
- public void trainOnline(DoubleVector trainingInstance) {
- model.trainOnline(trainingInstance);
- }
-
- /**
- * Get the matrix M used to encode the input features.
- *
- * @return this matrix with encode the input.
- */
- public DoubleMatrix getEncodeWeightMatrix() {
- return model.getWeightsByLayer(0);
- }
-
- /**
- * Get the matrix M used to decode the compressed information.
- *
- * @return this matrix with decode the compressed information.
- */
- public DoubleMatrix getDecodeWeightMatrix() {
- return model.getWeightsByLayer(1);
- }
-
- /**
- * Transform the input features.
- *
- * @param inputInstance
- * @return The compressed information.
- */
- private DoubleVector transform(DoubleVector inputInstance, int inputLayer) {
- DoubleVector internalInstance = new DenseDoubleVector(
- inputInstance.getDimension() + 1);
- internalInstance.set(0, 1);
- for (int i = 0; i < inputInstance.getDimension(); ++i) {
- internalInstance.set(i + 1, inputInstance.get(i));
- }
- DoubleFunction squashingFunction = model.getSquashingFunction(inputLayer);
- DoubleMatrix weightMatrix = null;
- if (inputLayer == 0) {
- weightMatrix = this.getEncodeWeightMatrix();
- } else {
- weightMatrix = this.getDecodeWeightMatrix();
- }
- DoubleVector vec = weightMatrix.multiplyVectorUnsafe(internalInstance);
- vec = vec.applyToElements(squashingFunction);
- return vec;
- }
-
- /**
- * Encode the input instance.
- *
- * @param inputInstance
- * @return a new vector with the encode input instance.
- */
- public DoubleVector encode(DoubleVector inputInstance) {
- Preconditions
- .checkArgument(
- inputInstance.getDimension() == model.getLayerSize(0) - 1,
- String
- .format(
- "The dimension of input instance is %d, but the model requires dimension %d.",
- inputInstance.getDimension(), model.getLayerSize(1) - 1));
- return this.transform(inputInstance, 0);
- }
-
- /**
- * Decode the input instance.
- *
- * @param inputInstance
- * @return a new vector with the decode input instance.
- */
- public DoubleVector decode(DoubleVector inputInstance) {
- Preconditions
- .checkArgument(
- inputInstance.getDimension() == model.getLayerSize(1) - 1,
- String
- .format(
- "The dimension of input instance is %d, but the model requires dimension %d.",
- inputInstance.getDimension(), model.getLayerSize(1) - 1));
- return this.transform(inputInstance, 1);
- }
-
- /**
- * Get the label(s) according to the given features.
- *
- * @param inputInstance
- * @return a new vector with output of the model according to given feature
- * instance.
- */
- public DoubleVector getOutput(DoubleVector inputInstance) {
- return model.getOutput(inputInstance);
- }
-
- /**
- * Set the feature transformer.
- *
- * @param featureTransformer
- */
- public void setFeatureTransformer(FeatureTransformer featureTransformer) {
- this.model.setFeatureTransformer(featureTransformer);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/HornJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/HornJob.java b/src/main/java/org/apache/horn/bsp/HornJob.java
deleted file mode 100644
index 4521b87..0000000
--- a/src/main/java/org/apache/horn/bsp/HornJob.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.commons.math.Function;
-import org.apache.horn.funcs.FunctionFactory;
-
-public class HornJob extends BSPJob {
-
- SmallLayeredNeuralNetwork neuralNetwork;
-
- public HornJob(HamaConfiguration conf, Class<?> exampleClass)
- throws IOException {
- super(conf);
- this.setJarByClass(exampleClass);
-
- neuralNetwork = new SmallLayeredNeuralNetwork();
- }
-
- public void inputLayer(int featureDimension, Class<? extends Function> func) {
- addLayer(featureDimension, func);
- }
-
- public void addLayer(int featureDimension, Class<? extends Function> func) {
- neuralNetwork.addLayer(featureDimension, false,
- FunctionFactory.createDoubleFunction(func.getSimpleName()));
- }
-
- public void outputLayer(int labels, Class<? extends Function> func) {
- neuralNetwork.addLayer(labels, true,
- FunctionFactory.createDoubleFunction(func.getSimpleName()));
- }
-
- public void setCostFunction(Class<? extends Function> func) {
- neuralNetwork.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction(func.getSimpleName()));
- }
-
- public void setDouble(String name, double value) {
- conf.setDouble(name, value);
- }
-
- public void setMaxIteration(int maxIteration) {
- this.conf.setInt("training.max.iterations", maxIteration);
- }
-
- public void setBatchSize(int batchSize) {
- this.conf.setInt("training.batch.size", batchSize);
- }
-
- public void setLearningRate(double learningRate) {
- this.conf.setDouble("mlp.learning.rate", learningRate);
- }
-
- public void setConvergenceCheckInterval(int n) {
- this.conf.setInt("convergence.check.interval", n);
- }
-
- public void setMomentumWeight(double momentumWeight) {
- this.conf.setDouble("mlp.momentum.weight", momentumWeight);
- }
-
- public SmallLayeredNeuralNetwork getNeuralNetwork() {
- return neuralNetwork;
- }
-
- public boolean waitForCompletion(boolean verbose) throws IOException,
- InterruptedException, ClassNotFoundException {
- BSPJob job = neuralNetwork.train(this.conf);
- if (verbose) {
- return job.waitForCompletion(true);
- } else {
- return job.waitForCompletion(false);
- }
- }
-
- public void setRegularizationWeight(double regularizationWeight) {
- this.conf.setDouble("regularization.weight", regularizationWeight);
- }
-
- public void setModelPath(String modelPath) {
- this.conf.set("model.path", modelPath);
- neuralNetwork.setModelPath(modelPath);
- }
-
- public void setTrainingSetPath(String inputPath) {
- this.conf.set("training.input.path", inputPath);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/NeuralNetwork.java b/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
deleted file mode 100644
index 051881d..0000000
--- a/src/main/java/org/apache/horn/bsp/NeuralNetwork.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.ml.util.DefaultFeatureTransformer;
-import org.apache.hama.ml.util.FeatureTransformer;
-
-import com.google.common.base.Preconditions;
-import com.google.common.io.Closeables;
-
-/**
- * NeuralNetwork defines the general operations for all the derivative models.
- * Typically, all derivative models such as Linear Regression, Logistic
- * Regression, and Multilayer Perceptron consist of neurons and the weights
- * between neurons.
- *
- */
-abstract class NeuralNetwork implements Writable {
- protected HamaConfiguration conf;
- protected FileSystem fs;
-
- private static final double DEFAULT_LEARNING_RATE = 0.5;
-
- protected double learningRate;
- protected boolean learningRateDecay = false;
-
- // the name of the model
- protected String modelType;
- // the path to store the model
- protected String modelPath;
-
- protected FeatureTransformer featureTransformer;
-
- public NeuralNetwork() {
- this.learningRate = DEFAULT_LEARNING_RATE;
- this.modelType = this.getClass().getSimpleName();
- this.featureTransformer = new DefaultFeatureTransformer();
- }
-
- public NeuralNetwork(String modelPath) {
- this.modelPath = modelPath;
- }
-
- public NeuralNetwork(HamaConfiguration conf, String modelPath) {
- try {
- this.conf = conf;
- this.fs = FileSystem.get(conf);
- this.modelPath = modelPath;
-
- this.readFromModel();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
-
- public void isLearningRateDecay(boolean decay) {
- this.learningRateDecay = decay;
- }
-
- public String getModelType() {
- return this.modelType;
- }
-
- /**
- * Train the model with the path of given training data and parameters.
- *
- * @param dataInputPath The path of the training data.
- * @param trainingParams The parameters for training.
- * @throws InterruptedException
- * @throws ClassNotFoundException
- * @throws IOException
- */
- public BSPJob train(Configuration conf) throws ClassNotFoundException, IOException, InterruptedException {
- Preconditions.checkArgument(this.modelPath != null,
- "Please set the model path before training.");
-
- // train with BSP job
- return trainInternal((HamaConfiguration) conf);
- }
-
- /**
- * Train the model with the path of given training data and parameters.
- */
- protected abstract BSPJob trainInternal(HamaConfiguration hamaConf)
- throws IOException, InterruptedException, ClassNotFoundException;
-
- /**
- * Read the model meta-data from the specified location.
- *
- * @throws IOException
- */
- protected void readFromModel() throws IOException {
- Preconditions.checkArgument(this.modelPath != null,
- "Model path has not been set.");
- FSDataInputStream is = new FSDataInputStream(fs.open(new Path(modelPath)));
- this.readFields(is);
- Closeables.close(is, false);
- }
-
- /**
- * Write the model data to specified location.
- *
- * @throws IOException
- */
- public void writeModelToFile() throws IOException {
- Preconditions.checkArgument(this.modelPath != null,
- "Model path has not been set.");
-
- FSDataOutputStream is = fs.create(new Path(this.modelPath), true);
- this.write(is);
-
- Closeables.close(is, false);
- }
-
- /**
- * Set the model path.
- *
- * @param modelPath
- */
- public void setModelPath(String modelPath) {
- this.modelPath = modelPath;
- }
-
- /**
- * Get the model path.
- *
- * @return the path to store the model.
- */
- public String getModelPath() {
- return this.modelPath;
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public void readFields(DataInput input) throws IOException {
- // read model type
- this.modelType = WritableUtils.readString(input);
- // read learning rate
- this.learningRate = input.readDouble();
- // read model path
- this.modelPath = WritableUtils.readString(input);
-
- if (this.modelPath.equals("null")) {
- this.modelPath = null;
- }
-
- // read feature transformer
- int bytesLen = input.readInt();
- byte[] featureTransformerBytes = new byte[bytesLen];
- for (int i = 0; i < featureTransformerBytes.length; ++i) {
- featureTransformerBytes[i] = input.readByte();
- }
-
- Class<? extends FeatureTransformer> featureTransformerCls = (Class<? extends FeatureTransformer>) SerializationUtils
- .deserialize(featureTransformerBytes);
-
- Constructor[] constructors = featureTransformerCls
- .getDeclaredConstructors();
- Constructor constructor = constructors[0];
-
- try {
- this.featureTransformer = (FeatureTransformer) constructor
- .newInstance(new Object[] {});
- } catch (InstantiationException e) {
- e.printStackTrace();
- } catch (IllegalAccessException e) {
- e.printStackTrace();
- } catch (IllegalArgumentException e) {
- e.printStackTrace();
- } catch (InvocationTargetException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- // write model type
- WritableUtils.writeString(output, modelType);
- // write learning rate
- output.writeDouble(learningRate);
- // write model path
- if (this.modelPath != null) {
- WritableUtils.writeString(output, modelPath);
- } else {
- WritableUtils.writeString(output, "null");
- }
-
- // serialize the class
- Class<? extends FeatureTransformer> featureTransformerCls = this.featureTransformer
- .getClass();
- byte[] featureTransformerBytes = SerializationUtils
- .serialize(featureTransformerCls);
- output.writeInt(featureTransformerBytes.length);
- output.write(featureTransformerBytes);
- }
-
- public void setFeatureTransformer(FeatureTransformer featureTransformer) {
- this.featureTransformer = featureTransformer;
- }
-
- public FeatureTransformer getFeatureTransformer() {
- return this.featureTransformer;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java b/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java
deleted file mode 100644
index 648e86b..0000000
--- a/src/main/java/org/apache/horn/bsp/NeuralNetworkTrainer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.ml.util.DefaultFeatureTransformer;
-import org.apache.hama.ml.util.FeatureTransformer;
-
-/**
- * The trainer that is used to train the {@link SmallLayeredNeuralNetwork} with
- * BSP. The trainer would read the training data and obtain the trained
- * parameters of the model.
- *
- */
-public abstract class NeuralNetworkTrainer extends
- BSP<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> {
-
- protected static final Log LOG = LogFactory
- .getLog(NeuralNetworkTrainer.class);
-
- protected Configuration conf;
- protected int maxIteration;
- protected int batchSize;
- protected String trainingMode;
-
- protected FeatureTransformer featureTransformer;
-
- @Override
- final public void setup(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
- throws IOException, SyncException, InterruptedException {
- conf = peer.getConfiguration();
- featureTransformer = new DefaultFeatureTransformer();
- this.extraSetup(peer);
- }
-
- /**
- * Handle extra setup for sub-classes.
- *
- * @param peer
- * @throws IOException
- * @throws SyncException
- * @throws InterruptedException
- */
- protected void extraSetup(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
- throws IOException, SyncException, InterruptedException {
-
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public abstract void bsp(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
- throws IOException, SyncException, InterruptedException;
-
- @Override
- public void cleanup(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
- throws IOException {
- this.extraCleanup(peer);
- // write model to modelPath
- }
-
- /**
- * Handle cleanup for sub-classes. Write the trained model back.
- *
- * @param peer
- * @throws IOException
- * @throws SyncException
- * @throws InterruptedException
- */
- protected void extraCleanup(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, Synapse> peer)
- throws IOException {
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/Neuron.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/Neuron.java b/src/main/java/org/apache/horn/bsp/Neuron.java
deleted file mode 100644
index f122b6d..0000000
--- a/src/main/java/org/apache/horn/bsp/Neuron.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.commons.math.DoubleFunction;
-
-public abstract class Neuron<M extends Writable> implements NeuronInterface<M> {
- double output;
- double weight;
- double delta;
- protected DoubleFunction squashingFunction;
-
- public void feedforward(double sum) {
- // TODO Auto-generated method stub
- // squashing
- this.output = sum;
- }
-
- public void backpropagate(double gradient) {
- // TODO Auto-generated method stub
- this.delta = gradient;
- }
-
- public double getDelta() {
- return delta;
- }
-
- public void setWeight(double weight) {
- this.weight = weight;
- }
-
- public void setOutput(double output) {
- this.output = output;
- }
-
- public double getOutput() {
- return output;
- }
-
- // ////////* Below methods will communicate with parameter server */
- private int i;
-
- public void push(double weight) {
- weights[i++] = weight;
- }
-
- public double getUpdate() {
- return weight;
- }
-
- double[] weights;
-
- public void setWeightVector(int rowCount) {
- i = 0;
- weights = new double[rowCount];
- }
-
- public double[] getWeights() {
- return weights;
- }
-
- public void setSquashingFunction(DoubleFunction squashingFunction) {
- this.squashingFunction = squashingFunction;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/NeuronInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/NeuronInterface.java b/src/main/java/org/apache/horn/bsp/NeuronInterface.java
deleted file mode 100644
index bcc1a5a..0000000
--- a/src/main/java/org/apache/horn/bsp/NeuronInterface.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.HamaConfiguration;
-
-public interface NeuronInterface<M extends Writable> {
-
- public void setup(HamaConfiguration conf);
-
- /**
- * This method is called when the messages are propagated from the lower
- * layer. It can be used to determine if the neuron would activate, or fire.
- *
- * @param messages
- * @throws IOException
- */
- public void forward(Iterable<M> messages) throws IOException;
-
- /**
- * This method is called when the errors are propagated from the upper layer.
- * It can be used to calculate the error of each neuron and change the
- * weights.
- *
- * @param messages
- * @throws IOException
- */
- public void backward(Iterable<M> messages) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/ParameterMerger.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/ParameterMerger.java b/src/main/java/org/apache/horn/bsp/ParameterMerger.java
deleted file mode 100644
index 6df719a..0000000
--- a/src/main/java/org/apache/horn/bsp/ParameterMerger.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import org.apache.hama.ipc.VersionedProtocol;
-
-public interface ParameterMerger extends VersionedProtocol {
- long versionID = 1L;
-
- SmallLayeredNeuralNetworkMessage merge(SmallLayeredNeuralNetworkMessage msg);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java b/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java
deleted file mode 100644
index 47aab84..0000000
--- a/src/main/java/org/apache/horn/bsp/ParameterMergerServer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hama.commons.math.DoubleMatrix;
-
-import com.google.common.base.Preconditions;
-
-public class ParameterMergerServer implements ParameterMerger {
-
- private static final Log LOG = LogFactory.getLog(ParameterMergerServer.class);
-
- /* The parameter merge base. */
- protected SmallLayeredNeuralNetwork inMemoryModel;
-
- /* To terminate or not to terminate. */
- protected AtomicBoolean isConverge;
-
- /* The number of slave works that request commits. */
- protected int SlaveCount;
-
- /* After mergeLimit, terminate whether the result is converging or not. */
- protected int mergeLimit;
-
- /*
- * last n training errors. converging is decided based on the average value of
- * these errors.
- */
- protected double[] trainingErrors;
-
- /*
- * If the average of last n training errors is smaller than this value, it is
- * converging.
- */
- protected double prevAvgTrainingError = Double.MAX_VALUE;
-
- /* current index for trainingErrors. */
- protected int curTrainingError = 0;
-
- /* how many merges have been conducted? */
- protected int mergeCount = 0;
-
- public ParameterMergerServer(SmallLayeredNeuralNetwork inMemoryModel,
- AtomicBoolean isConverge, int slaveCount, int mergeLimit,
- int convergenceCheckInterval) {
- this.inMemoryModel = inMemoryModel;
- this.isConverge = isConverge;
- this.SlaveCount = slaveCount;
- this.mergeLimit = mergeLimit;
- this.trainingErrors = new double[convergenceCheckInterval];
- }
-
- @Override
- public long getProtocolVersion(String s, long l) throws IOException {
- return ParameterMerger.versionID;
- }
-
- @Override
- public SmallLayeredNeuralNetworkMessage merge(
- SmallLayeredNeuralNetworkMessage msg) {
-
- double trainingError = msg.getTrainingError();
- DoubleMatrix[] weightUpdates = msg.getCurMatrices();
- DoubleMatrix[] prevWeightUpdates = msg.getPrevMatrices();
-
- Preconditions
- .checkArgument(weightUpdates.length == prevWeightUpdates.length);
-
- LOG.info("Start merging: " + this.mergeCount);
-
- if (!this.isConverge.get()) {
- for (int i = 0; i < weightUpdates.length; ++i) {
- weightUpdates[i] = weightUpdates[i].divide(this.SlaveCount);
- prevWeightUpdates[i] = prevWeightUpdates[i].divide(this.SlaveCount);
- }
-
- synchronized (inMemoryModel) {
- this.inMemoryModel.updateWeightMatrices(weightUpdates);
- this.inMemoryModel.setPrevWeightMatrices(prevWeightUpdates);
-
- // add trainingError to trainingErrors
- this.trainingErrors[this.curTrainingError++] = trainingError;
-
- // check convergence
- if (this.trainingErrors.length == this.curTrainingError) {
- double curAvgTrainingError = 0.0;
- for (int i = 0; i < this.curTrainingError; ++i) {
- curAvgTrainingError += this.trainingErrors[i];
- }
- curAvgTrainingError /= this.trainingErrors.length;
-
- if (prevAvgTrainingError < curAvgTrainingError) {
- this.isConverge.set(true);
- } else {
- // update
- prevAvgTrainingError = curAvgTrainingError;
- this.curTrainingError = 0;
- }
- }
-
- if (++this.mergeCount == this.mergeLimit) {
- this.isConverge.set(true);
- }
- }
- }
-
- return new SmallLayeredNeuralNetworkMessage(0, this.isConverge.get(),
- this.inMemoryModel.getWeightMatrices(),
- this.inMemoryModel.getPrevMatricesUpdates());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
deleted file mode 100644
index 0ea8e51..0000000
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetwork.java
+++ /dev/null
@@ -1,620 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.math.RandomUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.commons.io.MatrixWritable;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleFunction;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.util.ReflectionUtils;
-import org.apache.horn.examples.MultiLayerPerceptron.StandardNeuron;
-import org.apache.horn.funcs.FunctionFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * SmallLayeredNeuralNetwork defines the general operations for derivative
- * layered models, include Linear Regression, Logistic Regression, Multilayer
- * Perceptron, Autoencoder, and Restricted Boltzmann Machine, etc. For
- * SmallLayeredNeuralNetwork, the training can be conducted in parallel, but the
- * parameters of the models are assumes to be stored in a single machine.
- *
- * In general, these models consist of neurons which are aligned in layers.
- * Between layers, for any two adjacent layers, the neurons are connected to
- * form a bipartite weighted graph.
- *
- */
-public class SmallLayeredNeuralNetwork extends AbstractLayeredNeuralNetwork {
-
- private static final Log LOG = LogFactory
- .getLog(SmallLayeredNeuralNetwork.class);
-
- public static Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>> neuronClass;
-
- /* Weights between neurons at adjacent layers */
- protected List<DoubleMatrix> weightMatrixList;
-
- /* Previous weight updates between neurons at adjacent layers */
- protected List<DoubleMatrix> prevWeightUpdatesList;
-
- /* Different layers can have different squashing function */
- protected List<DoubleFunction> squashingFunctionList;
-
- protected int finalLayerIdx;
-
- protected double regularizationWeight;
-
- public SmallLayeredNeuralNetwork() {
- this.layerSizeList = Lists.newArrayList();
- this.weightMatrixList = Lists.newArrayList();
- this.prevWeightUpdatesList = Lists.newArrayList();
- this.squashingFunctionList = Lists.newArrayList();
- }
-
- public SmallLayeredNeuralNetwork(HamaConfiguration conf, String modelPath) {
- super(conf, modelPath);
- this.regularizationWeight = conf.getDouble("regularization.weight", 0);
- }
-
- @Override
- /**
- * {@inheritDoc}
- */
- public int addLayer(int size, boolean isFinalLayer,
- DoubleFunction squashingFunction) {
- Preconditions.checkArgument(size > 0,
- "Size of layer must be larger than 0.");
- if (!isFinalLayer) {
- size += 1;
- }
-
- LOG.info("Add Layer: " + size);
- this.layerSizeList.add(size);
- int layerIdx = this.layerSizeList.size() - 1;
- if (isFinalLayer) {
- this.finalLayerIdx = layerIdx;
- }
-
- // add weights between current layer and previous layer, and input layer has
- // no squashing function
- if (layerIdx > 0) {
- int sizePrevLayer = this.layerSizeList.get(layerIdx - 1);
- // row count equals to size of current size and column count equals to
- // size of previous layer
- int row = isFinalLayer ? size : size - 1;
- int col = sizePrevLayer;
- DoubleMatrix weightMatrix = new DenseDoubleMatrix(row, col);
- // initialize weights
- weightMatrix.applyToElements(new DoubleFunction() {
- @Override
- public double apply(double value) {
- return RandomUtils.nextDouble() - 0.5;
- }
-
- @Override
- public double applyDerivative(double value) {
- throw new UnsupportedOperationException("");
- }
- });
- this.weightMatrixList.add(weightMatrix);
- this.prevWeightUpdatesList.add(new DenseDoubleMatrix(row, col));
- this.squashingFunctionList.add(squashingFunction);
- }
- return layerIdx;
- }
-
- /**
- * Update the weight matrices with given matrices.
- *
- * @param matrices
- */
- public void updateWeightMatrices(DoubleMatrix[] matrices) {
- for (int i = 0; i < matrices.length; ++i) {
- DoubleMatrix matrix = this.weightMatrixList.get(i);
- this.weightMatrixList.set(i, matrix.add(matrices[i]));
- }
- }
-
- /**
- * Set the previous weight matrices.
- *
- * @param prevUpdates
- */
- void setPrevWeightMatrices(DoubleMatrix[] prevUpdates) {
- this.prevWeightUpdatesList.clear();
- Collections.addAll(this.prevWeightUpdatesList, prevUpdates);
- }
-
- /**
- * Add a batch of matrices onto the given destination matrices.
- *
- * @param destMatrices
- * @param sourceMatrices
- */
- static void matricesAdd(DoubleMatrix[] destMatrices,
- DoubleMatrix[] sourceMatrices) {
- for (int i = 0; i < destMatrices.length; ++i) {
- destMatrices[i] = destMatrices[i].add(sourceMatrices[i]);
- }
- }
-
- /**
- * Get all the weight matrices.
- *
- * @return The matrices in form of matrix array.
- */
- DoubleMatrix[] getWeightMatrices() {
- DoubleMatrix[] matrices = new DoubleMatrix[this.weightMatrixList.size()];
- this.weightMatrixList.toArray(matrices);
- return matrices;
- }
-
- /**
- * Set the weight matrices.
- *
- * @param matrices
- */
- public void setWeightMatrices(DoubleMatrix[] matrices) {
- this.weightMatrixList = new ArrayList<DoubleMatrix>();
- Collections.addAll(this.weightMatrixList, matrices);
- }
-
- /**
- * Get the previous matrices updates in form of array.
- *
- * @return The matrices in form of matrix array.
- */
- public DoubleMatrix[] getPrevMatricesUpdates() {
- DoubleMatrix[] prevMatricesUpdates = new DoubleMatrix[this.prevWeightUpdatesList
- .size()];
- for (int i = 0; i < this.prevWeightUpdatesList.size(); ++i) {
- prevMatricesUpdates[i] = this.prevWeightUpdatesList.get(i);
- }
- return prevMatricesUpdates;
- }
-
- public void setWeightMatrix(int index, DoubleMatrix matrix) {
- Preconditions.checkArgument(
- 0 <= index && index < this.weightMatrixList.size(), String.format(
- "index [%d] should be in range[%d, %d].", index, 0,
- this.weightMatrixList.size()));
- this.weightMatrixList.set(index, matrix);
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- super.readFields(input);
-
- // read squash functions
- int squashingFunctionSize = input.readInt();
- this.squashingFunctionList = Lists.newArrayList();
- for (int i = 0; i < squashingFunctionSize; ++i) {
- this.squashingFunctionList.add(FunctionFactory
- .createDoubleFunction(WritableUtils.readString(input)));
- }
-
- // read weights and construct matrices of previous updates
- int numOfMatrices = input.readInt();
- this.weightMatrixList = Lists.newArrayList();
- this.prevWeightUpdatesList = Lists.newArrayList();
- for (int i = 0; i < numOfMatrices; ++i) {
- DoubleMatrix matrix = MatrixWritable.read(input);
- this.weightMatrixList.add(matrix);
- this.prevWeightUpdatesList.add(new DenseDoubleMatrix(
- matrix.getRowCount(), matrix.getColumnCount()));
- }
-
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- super.write(output);
-
- // write squashing functions
- output.writeInt(this.squashingFunctionList.size());
- for (DoubleFunction aSquashingFunctionList : this.squashingFunctionList) {
- WritableUtils.writeString(output,
- aSquashingFunctionList.getFunctionName());
- }
-
- // write weight matrices
- output.writeInt(this.weightMatrixList.size());
- for (DoubleMatrix aWeightMatrixList : this.weightMatrixList) {
- MatrixWritable.write(aWeightMatrixList, output);
- }
-
- // DO NOT WRITE WEIGHT UPDATE
- }
-
- @Override
- public DoubleMatrix getWeightsByLayer(int layerIdx) {
- return this.weightMatrixList.get(layerIdx);
- }
-
- /**
- * Get the output of the model according to given feature instance.
- */
- @Override
- public DoubleVector getOutput(DoubleVector instance) {
- Preconditions.checkArgument(this.layerSizeList.get(0) - 1 == instance
- .getDimension(), String.format(
- "The dimension of input instance should be %d.",
- this.layerSizeList.get(0) - 1));
- // transform the features to another space
- DoubleVector transformedInstance = this.featureTransformer
- .transform(instance);
- // add bias feature
- DoubleVector instanceWithBias = new DenseDoubleVector(
- transformedInstance.getDimension() + 1);
- instanceWithBias.set(0, 0.99999); // set bias to be a little bit less than
- // 1.0
- for (int i = 1; i < instanceWithBias.getDimension(); ++i) {
- instanceWithBias.set(i, transformedInstance.get(i - 1));
- }
-
- List<DoubleVector> outputCache = getOutputInternal(instanceWithBias);
- // return the output of the last layer
- DoubleVector result = outputCache.get(outputCache.size() - 1);
- // remove bias
- return result.sliceUnsafe(1, result.getDimension() - 1);
- }
-
- /**
- * Calculate output internally, the intermediate output of each layer will be
- * stored.
- *
- * @param instanceWithBias The instance contains the features.
- * @return Cached output of each layer.
- */
- public List<DoubleVector> getOutputInternal(DoubleVector instanceWithBias) {
- List<DoubleVector> outputCache = new ArrayList<DoubleVector>();
- // fill with instance
- DoubleVector intermediateOutput = instanceWithBias;
- outputCache.add(intermediateOutput);
-
- for (int i = 0; i < this.layerSizeList.size() - 1; ++i) {
- intermediateOutput = forward(i, intermediateOutput);
- outputCache.add(intermediateOutput);
- }
-
- return outputCache;
- }
-
- /**
- * @return a new neuron instance
- */
- public static Neuron<Synapse<DoubleWritable, DoubleWritable>> newNeuronInstance() {
- return (Neuron<Synapse<DoubleWritable, DoubleWritable>>) ReflectionUtils
- .newInstance(neuronClass);
- }
-
- /**
- * Forward the calculation for one layer.
- *
- * @param fromLayer The index of the previous layer.
- * @param intermediateOutput The intermediateOutput of previous layer.
- * @return a new vector with the result of the operation.
- */
- protected DoubleVector forward(int fromLayer, DoubleVector intermediateOutput) {
- DoubleMatrix weightMatrix = this.weightMatrixList.get(fromLayer);
-
- neuronClass = (Class<Neuron<Synapse<DoubleWritable, DoubleWritable>>>) conf
- .getClass("neuron.class", Neuron.class);
-
- // TODO use the multithread processing
- DoubleVector vec = new DenseDoubleVector(weightMatrix.getRowCount());
- for (int row = 0; row < weightMatrix.getRowCount(); row++) {
- List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
- for (int col = 0; col < weightMatrix.getColumnCount(); col++) {
- msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
- new DoubleWritable(intermediateOutput.get(col)),
- new DoubleWritable(weightMatrix.get(row, col))));
- }
- Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
- Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
- n.setup(conf);
- n.setSquashingFunction(this.squashingFunctionList.get(fromLayer));
- try {
- n.forward(iterable);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- vec.set(row, n.getOutput());
- }
-
- // add bias
- DoubleVector vecWithBias = new DenseDoubleVector(vec.getDimension() + 1);
- vecWithBias.set(0, 1);
- for (int i = 0; i < vec.getDimension(); ++i) {
- vecWithBias.set(i + 1, vec.get(i));
- }
-
- return vecWithBias;
- }
-
- /**
- * Train the model online.
- *
- * @param trainingInstance
- */
- public void trainOnline(DoubleVector trainingInstance) {
- DoubleMatrix[] updateMatrices = this.trainByInstance(trainingInstance);
- this.updateWeightMatrices(updateMatrices);
- }
-
- @Override
- public DoubleMatrix[] trainByInstance(DoubleVector trainingInstance) {
- DoubleVector transformedVector = this.featureTransformer
- .transform(trainingInstance.sliceUnsafe(this.layerSizeList.get(0) - 1));
-
- int inputDimension = this.layerSizeList.get(0) - 1;
- int outputDimension;
- DoubleVector inputInstance = null;
- DoubleVector labels = null;
- if (this.learningStyle == LearningStyle.SUPERVISED) {
- outputDimension = this.layerSizeList.get(this.layerSizeList.size() - 1);
- // validate training instance
- Preconditions.checkArgument(
- inputDimension + outputDimension == trainingInstance.getDimension(),
- String
- .format(
- "The dimension of training instance is %d, but requires %d.",
- trainingInstance.getDimension(), inputDimension
- + outputDimension));
-
- inputInstance = new DenseDoubleVector(this.layerSizeList.get(0));
- inputInstance.set(0, 1); // add bias
- // get the features from the transformed vector
- for (int i = 0; i < inputDimension; ++i) {
- inputInstance.set(i + 1, transformedVector.get(i));
- }
- // get the labels from the original training instance
- labels = trainingInstance.sliceUnsafe(inputInstance.getDimension() - 1,
- trainingInstance.getDimension() - 1);
- } else if (this.learningStyle == LearningStyle.UNSUPERVISED) {
- // labels are identical to input features
- outputDimension = inputDimension;
- // validate training instance
- Preconditions.checkArgument(inputDimension == trainingInstance
- .getDimension(), String.format(
- "The dimension of training instance is %d, but requires %d.",
- trainingInstance.getDimension(), inputDimension));
-
- inputInstance = new DenseDoubleVector(this.layerSizeList.get(0));
- inputInstance.set(0, 1); // add bias
- // get the features from the transformed vector
- for (int i = 0; i < inputDimension; ++i) {
- inputInstance.set(i + 1, transformedVector.get(i));
- }
- // get the labels by copying the transformed vector
- labels = transformedVector.deepCopy();
- }
-
- List<DoubleVector> internalResults = this.getOutputInternal(inputInstance);
- DoubleVector output = internalResults.get(internalResults.size() - 1);
-
- // get the training error
- calculateTrainingError(labels,
- output.deepCopy().sliceUnsafe(1, output.getDimension() - 1));
-
- if (this.trainingMethod.equals(TrainingMethod.GRADIENT_DESCENT)) {
- return this.trainByInstanceGradientDescent(labels, internalResults);
- } else {
- throw new IllegalArgumentException(
- String.format("Training method is not supported."));
- }
- }
-
- /**
- * Train by gradient descent. Get the updated weights using one training
- * instance.
- *
- * @param trainingInstance
- * @return The weight update matrices.
- */
- private DoubleMatrix[] trainByInstanceGradientDescent(DoubleVector labels,
- List<DoubleVector> internalResults) {
-
- DoubleVector output = internalResults.get(internalResults.size() - 1);
- // initialize weight update matrices
- DenseDoubleMatrix[] weightUpdateMatrices = new DenseDoubleMatrix[this.weightMatrixList
- .size()];
- for (int m = 0; m < weightUpdateMatrices.length; ++m) {
- weightUpdateMatrices[m] = new DenseDoubleMatrix(this.weightMatrixList
- .get(m).getRowCount(), this.weightMatrixList.get(m).getColumnCount());
- }
- DoubleVector deltaVec = new DenseDoubleVector(
- this.layerSizeList.get(this.layerSizeList.size() - 1));
-
- DoubleFunction squashingFunction = this.squashingFunctionList
- .get(this.squashingFunctionList.size() - 1);
-
- DoubleMatrix lastWeightMatrix = this.weightMatrixList
- .get(this.weightMatrixList.size() - 1);
- for (int i = 0; i < deltaVec.getDimension(); ++i) {
- double costFuncDerivative = this.costFunction.applyDerivative(
- labels.get(i), output.get(i + 1));
- // add regularization
- costFuncDerivative += this.regularizationWeight
- * lastWeightMatrix.getRowVector(i).sum();
- deltaVec.set(
- i,
- costFuncDerivative
- * squashingFunction.applyDerivative(output.get(i + 1)));
- }
-
- // start from previous layer of output layer
- for (int layer = this.layerSizeList.size() - 2; layer >= 0; --layer) {
- output = internalResults.get(layer);
- deltaVec = backpropagate(layer, deltaVec, internalResults,
- weightUpdateMatrices[layer]);
- }
-
- this.setPrevWeightMatrices(weightUpdateMatrices);
-
- return weightUpdateMatrices;
- }
-
- /**
- * Back-propagate the errors to from next layer to current layer. The weight
- * updated information will be stored in the weightUpdateMatrices, and the
- * delta of the prevLayer would be returned.
- *
- * @param layer Index of current layer.
- * @param internalOutput Internal output of current layer.
- * @param deltaVec Delta of next layer.
- * @return the squashing function of the specified position.
- */
- private DoubleVector backpropagate(int curLayerIdx,
- DoubleVector nextLayerDelta, List<DoubleVector> outputCache,
- DenseDoubleMatrix weightUpdateMatrix) {
-
- // get layer related information
- DoubleVector curLayerOutput = outputCache.get(curLayerIdx);
- DoubleMatrix weightMatrix = this.weightMatrixList.get(curLayerIdx);
- DoubleMatrix prevWeightMatrix = this.prevWeightUpdatesList.get(curLayerIdx);
-
- // next layer is not output layer, remove the delta of bias neuron
- if (curLayerIdx != this.layerSizeList.size() - 2) {
- nextLayerDelta = nextLayerDelta.slice(1,
- nextLayerDelta.getDimension() - 1);
- }
-
- // DoubleMatrix transposed = weightMatrix.transpose();
- DoubleVector deltaVector = new DenseDoubleVector(
- weightMatrix.getColumnCount());
- for (int row = 0; row < weightMatrix.getColumnCount(); ++row) {
- Neuron<Synapse<DoubleWritable, DoubleWritable>> n = newNeuronInstance();
- // calls setup method
- n.setup(conf);
- n.setSquashingFunction(this.squashingFunctionList.get(curLayerIdx));
- n.setOutput(curLayerOutput.get(row));
-
- List<Synapse<DoubleWritable, DoubleWritable>> msgs = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
-
- n.setWeightVector(weightMatrix.getRowCount());
-
- for (int col = 0; col < weightMatrix.getRowCount(); ++col) {
- // sum += (transposed.get(row, col) * nextLayerDelta.get(col));
- msgs.add(new Synapse<DoubleWritable, DoubleWritable>(
- new DoubleWritable(nextLayerDelta.get(col)), new DoubleWritable(
- weightMatrix.get(col, row)), new DoubleWritable(
- prevWeightMatrix.get(col, row))));
- }
-
- Iterable<Synapse<DoubleWritable, DoubleWritable>> iterable = msgs;
- try {
- n.backward(iterable);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- // update weights
- weightUpdateMatrix.setColumn(row, n.getWeights());
- deltaVector.set(row, n.getDelta());
- }
-
- return deltaVector;
- }
-
- @Override
- protected BSPJob trainInternal(HamaConfiguration hamaConf)
- throws IOException, InterruptedException, ClassNotFoundException {
- this.conf = hamaConf;
- this.fs = FileSystem.get(conf);
-
- String modelPath = conf.get("model.path");
- if (modelPath != null) {
- this.modelPath = modelPath;
- }
- // modelPath must be set before training
- if (this.modelPath == null) {
- throw new IllegalArgumentException(
- "Please specify the modelPath for model, "
- + "either through setModelPath() or add 'modelPath' to the training parameters.");
- }
- this.writeModelToFile();
-
- // create job
- BSPJob job = new BSPJob(conf, SmallLayeredNeuralNetworkTrainer.class);
- job.setJobName("Small scale Neural Network training");
- job.setJarByClass(SmallLayeredNeuralNetworkTrainer.class);
- job.setBspClass(SmallLayeredNeuralNetworkTrainer.class);
-
- job.getConfiguration().setClass("neuron.class", StandardNeuron.class,
- Neuron.class);
-
- // additional for parameter server
- // TODO at this moment, we use 1 task as a parameter server
- // In the future, the number of parameter server should be configurable
- job.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
-
- job.setInputPath(new Path(conf.get("training.input.path")));
- job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
- job.setInputKeyClass(LongWritable.class);
- job.setInputValueClass(VectorWritable.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(NullWritable.class);
- job.setOutputFormat(org.apache.hama.bsp.NullOutputFormat.class);
-
- return job;
- }
-
- @Override
- protected void calculateTrainingError(DoubleVector labels, DoubleVector output) {
- DoubleVector errors = labels.deepCopy().applyToElements(output,
- this.costFunction);
- this.trainingError = errors.sum();
- }
-
- /**
- * Get the squashing function of a specified layer.
- *
- * @param idx
- * @return a new vector with the result of the operation.
- */
- public DoubleFunction getSquashingFunction(int idx) {
- return this.squashingFunctionList.get(idx);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java
deleted file mode 100644
index 2f8c287..0000000
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkMessage.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.commons.io.MatrixWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DoubleMatrix;
-
-/**
- * NeuralNetworkMessage transmits the messages between peers during the training
- * of neural networks.
- *
- */
-public class SmallLayeredNeuralNetworkMessage implements Writable {
-
- protected double trainingError;
- protected DoubleMatrix[] curMatrices;
- protected DoubleMatrix[] prevMatrices;
- protected boolean converge;
-
- public SmallLayeredNeuralNetworkMessage() {
- }
-
- public SmallLayeredNeuralNetworkMessage(double trainingError,
- boolean converge, DoubleMatrix[] weightMatrices,
- DoubleMatrix[] prevMatrices) {
- this.trainingError = trainingError;
- this.converge = converge;
- this.curMatrices = weightMatrices;
- this.prevMatrices = prevMatrices;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- trainingError = input.readDouble();
- converge = input.readBoolean();
- int numMatrices = input.readInt();
- boolean hasPrevMatrices = input.readBoolean();
- curMatrices = new DenseDoubleMatrix[numMatrices];
- // read matrice updates
- for (int i = 0; i < curMatrices.length; ++i) {
- curMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
- }
-
- if (hasPrevMatrices) {
- prevMatrices = new DenseDoubleMatrix[numMatrices];
- // read previous matrices updates
- for (int i = 0; i < prevMatrices.length; ++i) {
- prevMatrices[i] = (DenseDoubleMatrix) MatrixWritable.read(input);
- }
- }
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeDouble(trainingError);
- output.writeBoolean(converge);
- output.writeInt(curMatrices.length);
- if (prevMatrices == null) {
- output.writeBoolean(false);
- } else {
- output.writeBoolean(true);
- }
- for (DoubleMatrix matrix : curMatrices) {
- MatrixWritable.write(matrix, output);
- }
- if (prevMatrices != null) {
- for (DoubleMatrix matrix : prevMatrices) {
- MatrixWritable.write(matrix, output);
- }
- }
- }
-
- public double getTrainingError() {
- return trainingError;
- }
-
- public void setTrainingError(double trainingError) {
- this.trainingError = trainingError;
- }
-
- public boolean isConverge() {
- return converge;
- }
-
- public void setConverge(boolean converge) {
- this.converge = converge;
- }
-
- public DoubleMatrix[] getCurMatrices() {
- return curMatrices;
- }
-
- public void setMatrices(DoubleMatrix[] curMatrices) {
- this.curMatrices = curMatrices;
- }
-
- public DoubleMatrix[] getPrevMatrices() {
- return prevMatrices;
- }
-
- public void setPrevMatrices(DoubleMatrix[] prevMatrices) {
- this.prevMatrices = prevMatrices;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java b/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
deleted file mode 100644
index c3e258c..0000000
--- a/src/main/java/org/apache/horn/bsp/SmallLayeredNeuralNetworkTrainer.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSP;
-import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.ipc.RPC;
-
-import com.google.common.base.Preconditions;
-
-/**
- * The trainer that train the {@link SmallLayeredNeuralNetwork} based on BSP
- * framework.
- *
- */
-public final class SmallLayeredNeuralNetworkTrainer
- extends
- BSP<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> {
-
- private static final Log LOG = LogFactory
- .getLog(SmallLayeredNeuralNetworkTrainer.class);
-
- /* When given peer is master worker: base of parameter merge */
- /* When given peer is slave worker: neural network for training */
- private SmallLayeredNeuralNetwork inMemoryModel;
-
- /* Job configuration */
- private HamaConfiguration conf;
-
- /* Default batch size */
- private int batchSize;
-
- /* whether it is converging or not */
- private AtomicBoolean isConverge;
-
- /* When given peer is master worker: Asynchronous parameter merger */
- /* When given peer is slave worker: null */
- private RPC.Server merger;
-
- /* When given peer is master worker: null */
- /* When given peer is slave worker: proxy to Asynchronous parameter merger */
- private ParameterMerger proxy;
-
- /**
- * Returns true if this worker is master worker.
- *
- * @param peer
- * */
- private boolean isMaster(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
- return peer.getPeerIndex() == peer.getNumPeers() - 1;
- }
-
- @Override
- /**
- * If the model path is specified, load the existing from storage location.
- */
- public void setup(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
- // At least one master & slave worker exist.
- Preconditions.checkArgument(peer.getNumPeers() >= 2);
- this.conf = peer.getConfiguration();
-
- String modelPath = conf.get("model.path");
- this.inMemoryModel = new SmallLayeredNeuralNetwork(conf, modelPath);
-
- this.batchSize = conf.getInt("training.batch.size", 50);
- this.isConverge = new AtomicBoolean(false);
-
- int slaveCount = peer.getNumPeers() - 1;
- int mergeLimit = conf.getInt("training.max.iterations", 100000);
- int convergenceCheckInterval = peer.getNumPeers()
- * conf.getInt("convergence.check.interval", 2000);
- String master = peer.getPeerName();
- String masterAddr = master.substring(0, master.indexOf(':'));
- int port = conf.getInt("sync.server.port", 40052);
-
- if (isMaster(peer)) {
- try {
- this.merger = RPC.getServer(new ParameterMergerServer(inMemoryModel,
- isConverge, slaveCount, mergeLimit, convergenceCheckInterval),
- masterAddr, port, conf);
- merger.start();
- } catch (IOException e) {
- e.printStackTrace();
- }
- LOG.info("Begin to train");
- } else {
- InetSocketAddress addr = new InetSocketAddress(masterAddr, port);
- try {
- this.proxy = (ParameterMerger) RPC.getProxy(ParameterMerger.class,
- ParameterMerger.versionID, addr, conf);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Override
- /**
- * Write the trained model back to stored location.
- */
- public void cleanup(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer) {
- // write model to modelPath
- if (isMaster(peer)) {
- try {
- LOG.info("Write model back to " + inMemoryModel.getModelPath());
- this.inMemoryModel.writeModelToFile();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Override
- public void bsp(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer)
- throws IOException, SyncException, InterruptedException {
- while (!this.isConverge.get()) {
- // each slave-worker calculate the matrices updates according to local
- // data
- // and merge them with master
- if (!isMaster(peer)) {
- calculateUpdates(peer);
- }
- }
-
- if (isMaster(peer)) {
- merger.stop();
- }
- peer.sync(); // finalize the bsp program.
- }
-
- /**
- * Calculate the matrices updates according to local partition of data.
- *
- * @param peer
- * @throws IOException
- */
- private void calculateUpdates(
- BSPPeer<LongWritable, VectorWritable, NullWritable, NullWritable, SmallLayeredNeuralNetworkMessage> peer)
- throws IOException {
-
- DoubleMatrix[] weightUpdates = new DoubleMatrix[this.inMemoryModel.weightMatrixList
- .size()];
- for (int i = 0; i < weightUpdates.length; ++i) {
- int row = this.inMemoryModel.weightMatrixList.get(i).getRowCount();
- int col = this.inMemoryModel.weightMatrixList.get(i).getColumnCount();
- weightUpdates[i] = new DenseDoubleMatrix(row, col);
- }
-
- // continue to train
- double avgTrainingError = 0.0;
- LongWritable key = new LongWritable();
- VectorWritable value = new VectorWritable();
- for (int recordsRead = 0; recordsRead < batchSize; ++recordsRead) {
- if (!peer.readNext(key, value)) {
- peer.reopenInput();
- peer.readNext(key, value);
- }
- DoubleVector trainingInstance = value.getVector();
- SmallLayeredNeuralNetwork.matricesAdd(weightUpdates,
- this.inMemoryModel.trainByInstance(trainingInstance));
- avgTrainingError += this.inMemoryModel.trainingError;
- }
- avgTrainingError /= batchSize;
-
- // calculate the average of updates
- for (int i = 0; i < weightUpdates.length; ++i) {
- weightUpdates[i] = weightUpdates[i].divide(batchSize);
- }
-
- // exchange parameter update with master
- SmallLayeredNeuralNetworkMessage msg = new SmallLayeredNeuralNetworkMessage(
- avgTrainingError, false, weightUpdates,
- this.inMemoryModel.getPrevMatricesUpdates());
-
- SmallLayeredNeuralNetworkMessage inMessage = proxy.merge(msg);
- DoubleMatrix[] newWeights = inMessage.getCurMatrices();
- DoubleMatrix[] preWeightUpdates = inMessage.getPrevMatrices();
- this.inMemoryModel.setWeightMatrices(newWeights);
- this.inMemoryModel.setPrevWeightMatrices(preWeightUpdates);
- this.isConverge.set(inMessage.isConverge());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/bsp/Synapse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/bsp/Synapse.java b/src/main/java/org/apache/horn/bsp/Synapse.java
deleted file mode 100644
index 61725f9..0000000
--- a/src/main/java/org/apache/horn/bsp/Synapse.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Message wrapper for a propagating message
- */
-public class Synapse<M extends Writable, W extends Writable> implements
- Writable {
-
- DoubleWritable message;
- DoubleWritable weight;
- DoubleWritable prevWeight;
-
- public Synapse(DoubleWritable message, DoubleWritable weight) {
- this.message = message;
- this.weight = weight;
- }
-
- public Synapse(DoubleWritable message, DoubleWritable weight, DoubleWritable prevWeight) {
- this.message = message;
- this.weight = weight;
- this.prevWeight = prevWeight;
- }
-
- /**
- * @return the activation or error message
- */
- public double getMessage() {
- return message.get();
- }
-
- public double getInput() {
- // returns the input
- return message.get();
- }
-
- public double getDelta() {
- // returns the delta
- return message.get();
- }
-
- public double getWeight() {
- return weight.get();
- }
-
- public double getPrevWeight() {
- return prevWeight.get();
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- message.readFields(in);
- weight.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- message.write(out);
- weight.write(out);
- }
-
-}
\ No newline at end of file
[2/4] incubator-horn git commit: Code refactoring
Posted by ed...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/main/java/org/apache/horn/examples/NeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/horn/examples/NeuralNetwork.java b/src/main/java/org/apache/horn/examples/NeuralNetwork.java
deleted file mode 100644
index 5c0afdf..0000000
--- a/src/main/java/org/apache/horn/examples/NeuralNetwork.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.examples;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.horn.bsp.SmallLayeredNeuralNetwork;
-import org.apache.horn.funcs.FunctionFactory;
-
-/**
- * The example of using {@link SmallLayeredNeuralNetwork}, including the
- * training phase and labeling phase.
- */
-public class NeuralNetwork {
-
- public static void main(String[] args) throws Exception {
- if (args.length < 3) {
- printUsage();
- return;
- }
- HamaConfiguration conf = new HamaConfiguration();
- String mode = args[0];
-
- if (mode.equalsIgnoreCase("label")) {
- if (args.length < 4) {
- printUsage();
- return;
- }
-
- String featureDataPath = args[1];
- String resultDataPath = args[2];
- String modelPath = args[3];
-
- SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork(conf, modelPath);
-
- // process data in streaming approach
- FileSystem fs = FileSystem.get(new URI(featureDataPath), conf);
- BufferedReader br = new BufferedReader(new InputStreamReader(
- fs.open(new Path(featureDataPath))));
- Path outputPath = new Path(resultDataPath);
- if (fs.exists(outputPath)) {
- fs.delete(outputPath, true);
- }
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(
- fs.create(outputPath)));
-
- String line = null;
-
- while ((line = br.readLine()) != null) {
- if (line.trim().length() == 0) {
- continue;
- }
- String[] tokens = line.trim().split(",");
- double[] vals = new double[tokens.length];
- for (int i = 0; i < tokens.length; ++i) {
- vals[i] = Double.parseDouble(tokens[i]);
- }
- DoubleVector instance = new DenseDoubleVector(vals);
- DoubleVector result = ann.getOutput(instance);
- double[] arrResult = result.toArray();
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < arrResult.length; ++i) {
- sb.append(arrResult[i]);
- if (i != arrResult.length - 1) {
- sb.append(",");
- } else {
- sb.append("\n");
- }
- }
- bw.write(sb.toString());
- }
-
- br.close();
- bw.close();
- } else if (mode.equals("train")) {
- if (args.length < 5) {
- printUsage();
- return;
- }
-
- String trainingDataPath = args[1];
- String trainedModelPath = args[2];
-
- int featureDimension = Integer.parseInt(args[3]);
- int labelDimension = Integer.parseInt(args[4]);
-
- int iteration = 1000;
- double learningRate = 0.4;
- double momemtumWeight = 0.2;
- double regularizationWeight = 0.01;
-
- // parse parameters
- if (args.length >= 6) {
- try {
- iteration = Integer.parseInt(args[5]);
- System.out.printf("Iteration: %d\n", iteration);
- } catch (NumberFormatException e) {
- System.err
- .println("MAX_ITERATION format invalid. It should be a positive number.");
- return;
- }
- }
- if (args.length >= 7) {
- try {
- learningRate = Double.parseDouble(args[6]);
- System.out.printf("Learning rate: %f\n", learningRate);
- } catch (NumberFormatException e) {
- System.err
- .println("LEARNING_RATE format invalid. It should be a positive double in range (0, 1.0)");
- return;
- }
- }
- if (args.length >= 8) {
- try {
- momemtumWeight = Double.parseDouble(args[7]);
- System.out.printf("Momemtum weight: %f\n", momemtumWeight);
- } catch (NumberFormatException e) {
- System.err
- .println("MOMEMTUM_WEIGHT format invalid. It should be a positive double in range (0, 1.0)");
- return;
- }
- }
- if (args.length >= 9) {
- try {
- regularizationWeight = Double.parseDouble(args[8]);
- System.out
- .printf("Regularization weight: %f\n", regularizationWeight);
- } catch (NumberFormatException e) {
- System.err
- .println("REGULARIZATION_WEIGHT format invalid. It should be a positive double in range (0, 1.0)");
- return;
- }
- }
-
- // train the model
- SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
- // ann.setLearningRate(learningRate);
- // ann.setMomemtumWeight(momemtumWeight);
- // ann.setRegularizationWeight(regularizationWeight);
- ann.addLayer(featureDimension, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(featureDimension, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(labelDimension, true,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("CrossEntropy"));
- ann.setModelPath(trainedModelPath);
-
- Map<String, String> trainingParameters = new HashMap<String, String>();
- trainingParameters.put("tasks", "2");
- trainingParameters.put("training.max.iterations", "" + iteration);
- trainingParameters.put("training.batch.size", "300");
- trainingParameters.put("convergence.check.interval", "1000");
- // ann.train(conf, new Path(trainingDataPath), trainingParameters);
- }
-
- }
-
- private static void printUsage() {
- System.out
- .println("USAGE: <MODE> <INPUT_PATH> <OUTPUT_PATH> <MODEL_PATH>|<FEATURE_DIMENSION> <LABEL_DIMENSION> [<MAX_ITERATION> <LEARNING_RATE> <MOMEMTUM_WEIGHT> <REGULARIZATION_WEIGHT>]");
- System.out
- .println("\tMODE\t- train: train the model with given training data.");
- System.out
- .println("\t\t- label: obtain the result by feeding the features to the neural network.");
- System.out
- .println("\tINPUT_PATH\tin 'train' mode, it is the path of the training data; in 'label' mode, it is the path of the to be evaluated data that lacks the label.");
- System.out
- .println("\tOUTPUT_PATH\tin 'train' mode, it is where the trained model is stored; in 'label' mode, it is where the labeled data is stored.");
- System.out.println("\n\tConditional Parameters:");
- System.out
- .println("\tMODEL_PATH\tonly required in 'label' mode. It specifies where to load the trained neural network model.");
- System.out
- .println("\tMAX_ITERATION\tonly used in 'train' mode. It specifies how many iterations for the neural network to run. Default is 0.01.");
- System.out
- .println("\tLEARNING_RATE\tonly used to 'train' mode. It specifies the degree of aggregation for learning, usually in range (0, 1.0). Default is 0.1.");
- System.out
- .println("\tMOMEMTUM_WEIGHT\tonly used to 'train' mode. It specifies the weight of momemtum. Default is 0.");
- System.out
- .println("\tREGULARIZATION_WEIGHT\tonly required in 'train' model. It specifies the weight of reqularization.");
- System.out.println("\nExample:");
- System.out
- .println("Train a neural network with with feature dimension 8, label dimension 1 and default setting:\n\tneuralnets train hdfs://localhost:30002/training_data hdfs://localhost:30002/model 8 1");
- System.out
- .println("Train a neural network with with feature dimension 8, label dimension 1 and specify learning rate as 0.1, momemtum rate as 0.2, and regularization weight as 0.01:\n\tneuralnets.train hdfs://localhost:30002/training_data hdfs://localhost:30002/model 8 1 0.1 0.2 0.01");
- System.out
- .println("Label the data with trained model:\n\tneuralnets evaluate hdfs://localhost:30002/unlabeled_data hdfs://localhost:30002/result hdfs://localhost:30002/model");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/bsp/MLTestBase.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/bsp/MLTestBase.java b/src/test/java/org/apache/horn/bsp/MLTestBase.java
deleted file mode 100644
index 8001bcf..0000000
--- a/src/test/java/org/apache/horn/bsp/MLTestBase.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * The common methods for testing machine learning algorithms
- *
- */
-public abstract class MLTestBase {
-
- /**
- * Conduct the 0-1 normalization.
- *
- * @param instances
- */
- protected static void zeroOneNormalization(List<double[]> instanceList,
- int len) {
- int dimension = len;
-
- double[] mins = new double[dimension];
- double[] maxs = new double[dimension];
- Arrays.fill(mins, Double.MAX_VALUE);
- Arrays.fill(maxs, Double.MIN_VALUE);
-
- for (double[] instance : instanceList) {
- for (int i = 0; i < len; ++i) {
- if (mins[i] > instance[i]) {
- mins[i] = instance[i];
- }
- if (maxs[i] < instance[i]) {
- maxs[i] = instance[i];
- }
- }
- }
-
- for (double[] instance : instanceList) {
- for (int i = 0; i < len; ++i) {
- double range = maxs[i] - mins[i];
- if (range != 0) {
- instance[i] = (instance[i] - mins[i]) / range;
- }
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/bsp/TestAutoEncoder.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/bsp/TestAutoEncoder.java b/src/test/java/org/apache/horn/bsp/TestAutoEncoder.java
deleted file mode 100644
index a42fd72..0000000
--- a/src/test/java/org/apache/horn/bsp/TestAutoEncoder.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleVector;
-import org.junit.Test;
-import org.mortbay.log.Log;
-
-/**
- * Test the functionality of {@link AutoEncoder}.
- *
- */
-public class TestAutoEncoder extends MLTestBase {
-
- @Test
- public void testAutoEncoderSimple() {
- double[][] instances = { { 0, 0, 0, 1 }, { 0, 0, 1, 0 }, { 0, 1, 0, 0 },
- { 0, 0, 0, 0 } };
- AutoEncoder encoder = new AutoEncoder(4, 2);
- // TODO use the configuration
-
- // encoder.setLearningRate(0.5);
- // encoder.setMomemtumWeight(0.2);
-
- int maxIteration = 2000;
- Random rnd = new Random();
- for (int iteration = 0; iteration < maxIteration; ++iteration) {
- for (int i = 0; i < instances.length; ++i) {
- encoder.trainOnline(new DenseDoubleVector(instances[rnd
- .nextInt(instances.length)]));
- }
- }
-
- for (int i = 0; i < instances.length; ++i) {
- DoubleVector encodeVec = encoder.encode(new DenseDoubleVector(
- instances[i]));
- DoubleVector decodeVec = encoder.decode(encodeVec);
- for (int d = 0; d < instances[i].length; ++d) {
- assertEquals(instances[i][d], decodeVec.get(d), 0.1);
- }
- }
-
- }
-
- @Test
- public void testAutoEncoderSwissRollDataset() {
- List<double[]> instanceList = new ArrayList<double[]>();
- try {
- BufferedReader br = new BufferedReader(new FileReader(
- "src/test/resources/dimensional_reduction.txt"));
- String line = null;
- while ((line = br.readLine()) != null) {
- String[] tokens = line.split("\t");
- double[] instance = new double[tokens.length];
- for (int i = 0; i < instance.length; ++i) {
- instance[i] = Double.parseDouble(tokens[i]);
- }
- instanceList.add(instance);
- }
- br.close();
- // normalize instances
- zeroOneNormalization(instanceList, instanceList.get(0).length);
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (NumberFormatException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- List<DoubleVector> vecInstanceList = new ArrayList<DoubleVector>();
- for (double[] instance : instanceList) {
- vecInstanceList.add(new DenseDoubleVector(instance));
- }
- AutoEncoder encoder = new AutoEncoder(3, 2);
- // encoder.setLearningRate(0.05);
- // encoder.setMomemtumWeight(0.1);
- int maxIteration = 2000;
- for (int iteration = 0; iteration < maxIteration; ++iteration) {
- for (DoubleVector vector : vecInstanceList) {
- encoder.trainOnline(vector);
- }
- }
-
- double errorInstance = 0;
- for (DoubleVector vector : vecInstanceList) {
- DoubleVector decoded = encoder.getOutput(vector);
- DoubleVector diff = vector.subtract(decoded);
- double error = diff.dot(diff);
- if (error > 0.1) {
- ++errorInstance;
- }
- }
- Log.info(String.format("Autoecoder error rate: %f%%\n", errorInstance * 100
- / vecInstanceList.size()));
-
- }
-
- @Test
- public void testAutoEncoderSwissRollDatasetDistributed() {
- HamaConfiguration conf = new HamaConfiguration();
- String strDataPath = "/tmp/dimensional_reduction.txt";
- Path path = new Path(strDataPath);
- List<double[]> instanceList = new ArrayList<double[]>();
- try {
- FileSystem fs = FileSystem.get(new URI(strDataPath), conf);
- if (fs.exists(path)) {
- fs.delete(path, true);
- }
-
- String line = null;
- BufferedReader br = new BufferedReader(new FileReader(
- "src/test/resources/dimensional_reduction.txt"));
- while ((line = br.readLine()) != null) {
- String[] tokens = line.split("\t");
- double[] instance = new double[tokens.length];
- for (int i = 0; i < instance.length; ++i) {
- instance[i] = Double.parseDouble(tokens[i]);
- }
- instanceList.add(instance);
- }
- br.close();
- // normalize instances
- zeroOneNormalization(instanceList, instanceList.get(0).length);
-
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
- LongWritable.class, VectorWritable.class);
- for (int i = 0; i < instanceList.size(); ++i) {
- DoubleVector vector = new DenseDoubleVector(instanceList.get(i));
- writer.append(new LongWritable(i), new VectorWritable(vector));
- }
-
- writer.close();
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (URISyntaxException e) {
- e.printStackTrace();
- }
-
- AutoEncoder encoder = new AutoEncoder(3, 2);
- String modelPath = "/tmp/autoencoder-modelpath";
- encoder.setModelPath(modelPath);
- Map<String, String> trainingParams = new HashMap<String, String>();
- // encoder.setLearningRate(0.5);
- trainingParams.put("tasks", "5");
- trainingParams.put("training.max.iterations", "3000");
- trainingParams.put("training.batch.size", "200");
- // encoder.train(conf, path, trainingParams);
-
- double errorInstance = 0;
- for (double[] instance : instanceList) {
- DoubleVector vector = new DenseDoubleVector(instance);
- DoubleVector decoded = encoder.getOutput(vector);
- DoubleVector diff = vector.subtract(decoded);
- double error = diff.dot(diff);
- if (error > 0.1) {
- ++errorInstance;
- }
- }
- Log.info(String.format("Autoecoder error rate: %f%%\n", errorInstance * 100
- / instanceList.size()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetwork.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetwork.java b/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetwork.java
deleted file mode 100644
index ee48136..0000000
--- a/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetwork.java
+++ /dev/null
@@ -1,642 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.commons.io.VectorWritable;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DenseDoubleVector;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.apache.hama.commons.math.DoubleVector;
-import org.apache.hama.ml.util.DefaultFeatureTransformer;
-import org.apache.hama.ml.util.FeatureTransformer;
-import org.apache.horn.bsp.AbstractLayeredNeuralNetwork.LearningStyle;
-import org.apache.horn.bsp.AbstractLayeredNeuralNetwork.TrainingMethod;
-import org.apache.horn.funcs.FunctionFactory;
-import org.junit.Test;
-import org.mortbay.log.Log;
-
-/**
- * Test the functionality of SmallLayeredNeuralNetwork.
- *
- */
-public class TestSmallLayeredNeuralNetwork extends MLTestBase {
-
- @Test
- public void testReadWrite() {
- SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
- ann.addLayer(2, false,
- FunctionFactory.createDoubleFunction("IdentityFunction"));
- ann.addLayer(5, false,
- FunctionFactory.createDoubleFunction("IdentityFunction"));
- ann.addLayer(1, true,
- FunctionFactory.createDoubleFunction("IdentityFunction"));
- ann.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("SquaredError"));
- double learningRate = 0.2;
- // ann.setLearningRate(learningRate);
- double momentumWeight = 0.5;
- // ann.setMomemtumWeight(momentumWeight);
- double regularizationWeight = 0.05;
- //ann.setRegularizationWeight(regularizationWeight);
- // intentionally initialize all weights to 0.5
- DoubleMatrix[] matrices = new DenseDoubleMatrix[2];
- matrices[0] = new DenseDoubleMatrix(5, 3, 0.2);
- matrices[1] = new DenseDoubleMatrix(1, 6, 0.8);
- ann.setWeightMatrices(matrices);
- ann.setLearningStyle(LearningStyle.UNSUPERVISED);
-
- FeatureTransformer defaultFeatureTransformer = new DefaultFeatureTransformer();
- ann.setFeatureTransformer(defaultFeatureTransformer);
-
-
- // write to file
- String modelPath = "/tmp/testSmallLayeredNeuralNetworkReadWrite";
- ann.setModelPath(modelPath);
- try {
- ann.writeModelToFile();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- // read from file
- SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(new HamaConfiguration(), modelPath);
- assertEquals(annCopy.getClass().getSimpleName(), annCopy.getModelType());
- assertEquals(modelPath, annCopy.getModelPath());
- // assertEquals(learningRate, annCopy.getLearningRate(), 0.000001);
- // assertEquals(momentumWeight, annCopy.getMomemtumWeight(), 0.000001);
- //assertEquals(regularizationWeight, annCopy.getRegularizationWeight(),
- // 0.000001);
- assertEquals(TrainingMethod.GRADIENT_DESCENT, annCopy.getTrainingMethod());
- assertEquals(LearningStyle.UNSUPERVISED, annCopy.getLearningStyle());
-
- // compare weights
- DoubleMatrix[] weightsMatrices = annCopy.getWeightMatrices();
- for (int i = 0; i < weightsMatrices.length; ++i) {
- DoubleMatrix expectMat = matrices[i];
- DoubleMatrix actualMat = weightsMatrices[i];
- for (int j = 0; j < expectMat.getRowCount(); ++j) {
- for (int k = 0; k < expectMat.getColumnCount(); ++k) {
- assertEquals(expectMat.get(j, k), actualMat.get(j, k), 0.000001);
- }
- }
- }
-
- FeatureTransformer copyTransformer = annCopy.getFeatureTransformer();
- assertEquals(defaultFeatureTransformer.getClass().getName(), copyTransformer.getClass().getName());
- }
-
- @Test
- /**
- * Test the forward functionality.
- */
- public void testOutput() {
- // first network
- SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
- ann.addLayer(2, false,
- FunctionFactory.createDoubleFunction("IdentityFunction"));
- ann.addLayer(5, false,
- FunctionFactory.createDoubleFunction("IdentityFunction"));
- ann.addLayer(1, true,
- FunctionFactory.createDoubleFunction("IdentityFunction"));
- ann.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("SquaredError"));
- // ann.setLearningRate(0.1);
- // intentionally initialize all weights to 0.5
- DoubleMatrix[] matrices = new DenseDoubleMatrix[2];
- matrices[0] = new DenseDoubleMatrix(5, 3, 0.5);
- matrices[1] = new DenseDoubleMatrix(1, 6, 0.5);
- ann.setWeightMatrices(matrices);
-
- double[] arr = new double[] { 0, 1 };
- DoubleVector training = new DenseDoubleVector(arr);
- DoubleVector result = ann.getOutput(training);
- assertEquals(1, result.getDimension());
- // assertEquals(3, result.get(0), 0.000001);
-
- // second network
- SmallLayeredNeuralNetwork ann2 = new SmallLayeredNeuralNetwork();
- ann2.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann2.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann2.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann2.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("SquaredError"));
- // ann2.setLearningRate(0.3);
- // intentionally initialize all weights to 0.5
- DoubleMatrix[] matrices2 = new DenseDoubleMatrix[2];
- matrices2[0] = new DenseDoubleMatrix(3, 3, 0.5);
- matrices2[1] = new DenseDoubleMatrix(1, 4, 0.5);
- ann2.setWeightMatrices(matrices2);
-
- double[] test = { 0, 0 };
- double[] result2 = { 0.807476 };
-
- DoubleVector vec = ann2.getOutput(new DenseDoubleVector(test));
- assertArrayEquals(result2, vec.toArray(), 0.000001);
-
- SmallLayeredNeuralNetwork ann3 = new SmallLayeredNeuralNetwork();
- ann3.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann3.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann3.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann3.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("SquaredError"));
- // ann3.setLearningRate(0.3);
- // intentionally initialize all weights to 0.5
- DoubleMatrix[] initMatrices = new DenseDoubleMatrix[2];
- initMatrices[0] = new DenseDoubleMatrix(3, 3, 0.5);
- initMatrices[1] = new DenseDoubleMatrix(1, 4, 0.5);
- ann3.setWeightMatrices(initMatrices);
-
- double[] instance = { 0, 1 };
- DoubleVector output = ann3.getOutput(new DenseDoubleVector(instance));
- assertEquals(0.8315410, output.get(0), 0.000001);
- }
-
- @Test
- public void testXORlocal() {
- SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
- ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("SquaredError"));
- // ann.setLearningRate(0.5);
- // ann.setMomemtumWeight(0.0);
-
- int iterations = 50000; // iteration should be set to a very large number
- double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
- for (int i = 0; i < iterations; ++i) {
- DoubleMatrix[] matrices = null;
- for (int j = 0; j < instances.length; ++j) {
- matrices = ann.trainByInstance(new DenseDoubleVector(instances[j
- % instances.length]));
- ann.updateWeightMatrices(matrices);
- }
- }
-
- for (int i = 0; i < instances.length; ++i) {
- DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
- // the expected output is the last element in array
- double result = instances[i][2];
- double actual = ann.getOutput(input).get(0);
- if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
- Log.info("Neural network failes to lear the XOR.");
- }
- }
-
- // write model into file and read out
- String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocal";
- ann.setModelPath(modelPath);
- try {
- ann.writeModelToFile();
- } catch (IOException e) {
- e.printStackTrace();
- }
- SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(new HamaConfiguration(), modelPath);
- // test on instances
- for (int i = 0; i < instances.length; ++i) {
- DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
- // the expected output is the last element in array
- double result = instances[i][2];
- double actual = annCopy.getOutput(input).get(0);
- if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
- Log.info("Neural network failes to lear the XOR.");
- }
- }
- }
-
- @Test
- public void testXORWithMomentum() {
- SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
- ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("SquaredError"));
- // ann.setLearningRate(0.6);
- // ann.setMomemtumWeight(0.3);
-
- int iterations = 2000; // iteration should be set to a very large number
- double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
- for (int i = 0; i < iterations; ++i) {
- for (int j = 0; j < instances.length; ++j) {
- ann.trainOnline(new DenseDoubleVector(instances[j % instances.length]));
- }
- }
-
- for (int i = 0; i < instances.length; ++i) {
- DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
- // the expected output is the last element in array
- double result = instances[i][2];
- double actual = ann.getOutput(input).get(0);
- if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
- Log.info("Neural network failes to lear the XOR.");
- }
- }
-
- // write model into file and read out
- String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocalWithMomentum";
- ann.setModelPath(modelPath);
- try {
- ann.writeModelToFile();
- } catch (IOException e) {
- e.printStackTrace();
- }
- SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(new HamaConfiguration(), modelPath);
- // test on instances
- for (int i = 0; i < instances.length; ++i) {
- DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
- // the expected output is the last element in array
- double result = instances[i][2];
- double actual = annCopy.getOutput(input).get(0);
- if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
- Log.info("Neural network failes to lear the XOR.");
- }
- }
- }
-
- @Test
- public void testXORLocalWithRegularization() {
- SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
- ann.addLayer(2, false, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(3, false, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("SquaredError"));
- // ann.setLearningRate(0.7);
- // ann.setMomemtumWeight(0.5);
- //ann.setRegularizationWeight(0.002);
-
- int iterations = 5000; // iteration should be set to a very large number
- double[][] instances = { { 0, 1, 1 }, { 0, 0, 0 }, { 1, 0, 1 }, { 1, 1, 0 } };
- for (int i = 0; i < iterations; ++i) {
- for (int j = 0; j < instances.length; ++j) {
- ann.trainOnline(new DenseDoubleVector(instances[j % instances.length]));
- }
- }
-
- for (int i = 0; i < instances.length; ++i) {
- DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
- // the expected output is the last element in array
- double result = instances[i][2];
- double actual = ann.getOutput(input).get(0);
- if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
- Log.info("Neural network failes to lear the XOR.");
- }
- }
-
- // write model into file and read out
- String modelPath = "/tmp/testSmallLayeredNeuralNetworkXORLocalWithRegularization";
- ann.setModelPath(modelPath);
- try {
- ann.writeModelToFile();
- } catch (IOException e) {
- e.printStackTrace();
- }
- SmallLayeredNeuralNetwork annCopy = new SmallLayeredNeuralNetwork(new HamaConfiguration(), modelPath);
- // test on instances
- for (int i = 0; i < instances.length; ++i) {
- DoubleVector input = new DenseDoubleVector(instances[i]).slice(2);
- // the expected output is the last element in array
- double result = instances[i][2];
- double actual = annCopy.getOutput(input).get(0);
- if (result < 0.5 && actual >= 0.5 || result >= 0.5 && actual < 0.5) {
- Log.info("Neural network failes to lear the XOR.");
- }
- }
- }
-
- @Test
- public void testTwoClassClassification() {
- // use logistic regression data
- String filepath = "src/test/resources/logistic_regression_data.txt";
- List<double[]> instanceList = new ArrayList<double[]>();
-
- try {
- BufferedReader br = new BufferedReader(new FileReader(filepath));
- String line = null;
- while ((line = br.readLine()) != null) {
- String[] tokens = line.trim().split(",");
- double[] instance = new double[tokens.length];
- for (int i = 0; i < tokens.length; ++i) {
- instance[i] = Double.parseDouble(tokens[i]);
- }
- instanceList.add(instance);
- }
- br.close();
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- zeroOneNormalization(instanceList, instanceList.get(0).length - 1);
-
- int dimension = instanceList.get(0).length - 1;
-
- // divide dataset into training and testing
- List<double[]> testInstances = new ArrayList<double[]>();
- testInstances.addAll(instanceList.subList(instanceList.size() - 100,
- instanceList.size()));
- List<double[]> trainingInstances = instanceList.subList(0,
- instanceList.size() - 100);
-
- SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
- // ann.setLearningRate(0.001);
- // ann.setMomemtumWeight(0.1);
- //ann.setRegularizationWeight(0.01);
- ann.addLayer(dimension, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(dimension, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(dimension, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("CrossEntropy"));
-
- long start = new Date().getTime();
- int iterations = 1000;
- for (int i = 0; i < iterations; ++i) {
- for (double[] trainingInstance : trainingInstances) {
- ann.trainOnline(new DenseDoubleVector(trainingInstance));
- }
- }
- long end = new Date().getTime();
- Log.info(String.format("Training time: %fs\n",
- (double) (end - start) / 1000));
-
- double errorRate = 0;
- // calculate the error on test instance
- for (double[] testInstance : testInstances) {
- DoubleVector instance = new DenseDoubleVector(testInstance);
- double expected = instance.get(instance.getDimension() - 1);
- instance = instance.slice(instance.getDimension() - 1);
- double actual = ann.getOutput(instance).get(0);
- if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) {
- ++errorRate;
- }
- }
- errorRate /= testInstances.size();
-
- Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
- }
-
- @Test
- public void testLogisticRegression() {
- this.testLogisticRegressionDistributedVersion();
- this.testLogisticRegressionDistributedVersionWithFeatureTransformer();
- }
-
- public void testLogisticRegressionDistributedVersion() {
- // write data into a sequence file
- String tmpStrDatasetPath = "/tmp/logistic_regression_data";
- Path tmpDatasetPath = new Path(tmpStrDatasetPath);
- String strDataPath = "src/test/resources/logistic_regression_data.txt";
- String modelPath = "/tmp/logistic-regression-distributed-model";
-
- Configuration conf = new Configuration();
- List<double[]> instanceList = new ArrayList<double[]>();
- List<double[]> trainingInstances = null;
- List<double[]> testInstances = null;
-
- try {
- FileSystem fs = FileSystem.get(new URI(tmpStrDatasetPath), conf);
- fs.delete(tmpDatasetPath, true);
- if (fs.exists(tmpDatasetPath)) {
- fs.createNewFile(tmpDatasetPath);
- }
-
- BufferedReader br = new BufferedReader(new FileReader(strDataPath));
- String line = null;
- int count = 0;
- while ((line = br.readLine()) != null) {
- String[] tokens = line.trim().split(",");
- double[] instance = new double[tokens.length];
- for (int i = 0; i < tokens.length; ++i) {
- instance[i] = Double.parseDouble(tokens[i]);
- }
- instanceList.add(instance);
- }
- br.close();
-
- zeroOneNormalization(instanceList, instanceList.get(0).length - 1);
-
- // write training data to temporal sequence file
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
- tmpDatasetPath, LongWritable.class, VectorWritable.class);
- int testSize = 150;
-
- Collections.shuffle(instanceList);
- testInstances = new ArrayList<double[]>();
- testInstances.addAll(instanceList.subList(instanceList.size() - testSize,
- instanceList.size()));
- trainingInstances = instanceList.subList(0, instanceList.size()
- - testSize);
-
- for (double[] instance : trainingInstances) {
- DoubleVector vec = new DenseDoubleVector(instance);
- writer.append(new LongWritable(count++), new VectorWritable(vec));
- }
- writer.close();
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (URISyntaxException e) {
- e.printStackTrace();
- }
-
- // create model
- int dimension = 8;
- SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
- // ann.setLearningRate(0.7);
- // ann.setMomemtumWeight(0.5);
- //ann.setRegularizationWeight(0.1);
- ann.addLayer(dimension, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(dimension, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(dimension, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("CrossEntropy"));
- ann.setModelPath(modelPath);
-
- long start = new Date().getTime();
- Map<String, String> trainingParameters = new HashMap<String, String>();
- trainingParameters.put("tasks", "5");
- trainingParameters.put("training.max.iterations", "2000");
- trainingParameters.put("training.batch.size", "300");
- trainingParameters.put("convergence.check.interval", "1000");
- //ann.train(new HamaConfiguration(), tmpDatasetPath, trainingParameters);
-
- long end = new Date().getTime();
-
- // validate results
- double errorRate = 0;
- // calculate the error on test instance
- for (double[] testInstance : testInstances) {
- DoubleVector instance = new DenseDoubleVector(testInstance);
- double expected = instance.get(instance.getDimension() - 1);
- instance = instance.slice(instance.getDimension() - 1);
- double actual = ann.getOutput(instance).get(0);
- if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) {
- ++errorRate;
- }
- }
- errorRate /= testInstances.size();
-
- Log.info(String.format("Training time: %fs\n",
- (double) (end - start) / 1000));
- Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
- }
-
- public void testLogisticRegressionDistributedVersionWithFeatureTransformer() {
- // write data into a sequence file
- String tmpStrDatasetPath = "/tmp/logistic_regression_data_feature_transformer";
- Path tmpDatasetPath = new Path(tmpStrDatasetPath);
- String strDataPath = "src/test/resources/logistic_regression_data.txt";
- String modelPath = "/tmp/logistic-regression-distributed-model-feature-transformer";
-
- Configuration conf = new Configuration();
- List<double[]> instanceList = new ArrayList<double[]>();
- List<double[]> trainingInstances = null;
- List<double[]> testInstances = null;
-
- try {
- FileSystem fs = FileSystem.get(new URI(tmpStrDatasetPath), conf);
- fs.delete(tmpDatasetPath, true);
- if (fs.exists(tmpDatasetPath)) {
- fs.createNewFile(tmpDatasetPath);
- }
-
- BufferedReader br = new BufferedReader(new FileReader(strDataPath));
- String line = null;
- int count = 0;
- while ((line = br.readLine()) != null) {
- String[] tokens = line.trim().split(",");
- double[] instance = new double[tokens.length];
- for (int i = 0; i < tokens.length; ++i) {
- instance[i] = Double.parseDouble(tokens[i]);
- }
- instanceList.add(instance);
- }
- br.close();
-
- zeroOneNormalization(instanceList, instanceList.get(0).length - 1);
-
- // write training data to temporal sequence file
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
- tmpDatasetPath, LongWritable.class, VectorWritable.class);
- int testSize = 150;
-
- Collections.shuffle(instanceList);
- testInstances = new ArrayList<double[]>();
- testInstances.addAll(instanceList.subList(instanceList.size() - testSize,
- instanceList.size()));
- trainingInstances = instanceList.subList(0, instanceList.size()
- - testSize);
-
- for (double[] instance : trainingInstances) {
- DoubleVector vec = new DenseDoubleVector(instance);
- writer.append(new LongWritable(count++), new VectorWritable(vec));
- }
- writer.close();
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (URISyntaxException e) {
- e.printStackTrace();
- }
-
- // create model
- int dimension = 8;
- SmallLayeredNeuralNetwork ann = new SmallLayeredNeuralNetwork();
- // ann.setLearningRate(0.7);
- // ann.setMomemtumWeight(0.5);
- //ann.setRegularizationWeight(0.1);
- ann.addLayer(dimension, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(dimension, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(dimension, false,
- FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.addLayer(1, true, FunctionFactory.createDoubleFunction("Sigmoid"));
- ann.setCostFunction(FunctionFactory
- .createDoubleDoubleFunction("CrossEntropy"));
- ann.setModelPath(modelPath);
-
- FeatureTransformer featureTransformer = new DefaultFeatureTransformer();
-
- ann.setFeatureTransformer(featureTransformer);
-
- long start = new Date().getTime();
- Map<String, String> trainingParameters = new HashMap<String, String>();
- trainingParameters.put("tasks", "5");
- trainingParameters.put("training.max.iterations", "2000");
- trainingParameters.put("training.batch.size", "300");
- trainingParameters.put("convergence.check.interval", "1000");
- //ann.train(new HamaConfiguration(), tmpDatasetPath, trainingParameters);
-
- long end = new Date().getTime();
-
- // validate results
- double errorRate = 0;
- // calculate the error on test instance
- for (double[] testInstance : testInstances) {
- DoubleVector instance = new DenseDoubleVector(testInstance);
- double expected = instance.get(instance.getDimension() - 1);
- instance = instance.slice(instance.getDimension() - 1);
- instance = featureTransformer.transform(instance);
- double actual = ann.getOutput(instance).get(0);
- if (actual < 0.5 && expected >= 0.5 || actual >= 0.5 && expected < 0.5) {
- ++errorRate;
- }
- }
- errorRate /= testInstances.size();
-
- Log.info(String.format("Training time: %fs\n",
- (double) (end - start) / 1000));
- Log.info(String.format("Relative error: %f%%\n", errorRate * 100));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetworkMessage.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetworkMessage.java b/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetworkMessage.java
deleted file mode 100644
index e422d95..0000000
--- a/src/test/java/org/apache/horn/bsp/TestSmallLayeredNeuralNetworkMessage.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.horn.bsp;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hama.commons.math.DenseDoubleMatrix;
-import org.apache.hama.commons.math.DoubleMatrix;
-import org.junit.Test;
-
-/**
- * Test the functionalities of SmallLayeredNeuralNetworkMessage.
- *
- */
-public class TestSmallLayeredNeuralNetworkMessage {
-
- @Test
- public void testReadWriteWithoutPrev() {
- double error = 0.22;
- double[][] matrix1 = new double[][] { { 0.1, 0.2, 0.8, 0.5 },
- { 0.3, 0.4, 0.6, 0.2 }, { 0.5, 0.6, 0.1, 0.5 } };
- double[][] matrix2 = new double[][] { { 0.8, 1.2, 0.5 } };
- DoubleMatrix[] matrices = new DoubleMatrix[2];
- matrices[0] = new DenseDoubleMatrix(matrix1);
- matrices[1] = new DenseDoubleMatrix(matrix2);
-
- boolean isConverge = false;
-
- SmallLayeredNeuralNetworkMessage message = new SmallLayeredNeuralNetworkMessage(
- error, isConverge, matrices, null);
- Configuration conf = new Configuration();
- String strPath = "/tmp/testReadWriteSmallLayeredNeuralNetworkMessage";
- Path path = new Path(strPath);
- try {
- FileSystem fs = FileSystem.get(new URI(strPath), conf);
- FSDataOutputStream out = fs.create(path);
- message.write(out);
- out.close();
-
- FSDataInputStream in = fs.open(path);
- SmallLayeredNeuralNetworkMessage readMessage = new SmallLayeredNeuralNetworkMessage(
- 0, isConverge, null, null);
- readMessage.readFields(in);
- in.close();
- assertEquals(error, readMessage.getTrainingError(), 0.000001);
- assertFalse(readMessage.isConverge());
- DoubleMatrix[] readMatrices = readMessage.getCurMatrices();
- assertEquals(2, readMatrices.length);
- for (int i = 0; i < readMatrices.length; ++i) {
- double[][] doubleMatrices = ((DenseDoubleMatrix) readMatrices[i])
- .getValues();
- double[][] doubleExpected = ((DenseDoubleMatrix) matrices[i])
- .getValues();
- for (int r = 0; r < doubleMatrices.length; ++r) {
- assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001);
- }
- }
-
- DoubleMatrix[] readPrevMatrices = readMessage.getPrevMatrices();
- assertNull(readPrevMatrices);
-
- // delete
- fs.delete(path, true);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (URISyntaxException e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void testReadWriteWithPrev() {
- double error = 0.22;
- boolean isConverge = true;
-
- double[][] matrix1 = new double[][] { { 0.1, 0.2, 0.8, 0.5 },
- { 0.3, 0.4, 0.6, 0.2 }, { 0.5, 0.6, 0.1, 0.5 } };
- double[][] matrix2 = new double[][] { { 0.8, 1.2, 0.5 } };
- DoubleMatrix[] matrices = new DoubleMatrix[2];
- matrices[0] = new DenseDoubleMatrix(matrix1);
- matrices[1] = new DenseDoubleMatrix(matrix2);
-
- double[][] prevMatrix1 = new double[][] { { 0.1, 0.1, 0.2, 0.3 },
- { 0.2, 0.4, 0.1, 0.5 }, { 0.5, 0.1, 0.5, 0.2 } };
- double[][] prevMatrix2 = new double[][] { { 0.1, 0.2, 0.5, 0.9 },
- { 0.3, 0.5, 0.2, 0.6 }, { 0.6, 0.8, 0.7, 0.5 } };
-
- DoubleMatrix[] prevMatrices = new DoubleMatrix[2];
- prevMatrices[0] = new DenseDoubleMatrix(prevMatrix1);
- prevMatrices[1] = new DenseDoubleMatrix(prevMatrix2);
-
- SmallLayeredNeuralNetworkMessage message = new SmallLayeredNeuralNetworkMessage(
- error, isConverge, matrices, prevMatrices);
- Configuration conf = new Configuration();
- String strPath = "/tmp/testReadWriteSmallLayeredNeuralNetworkMessageWithPrev";
- Path path = new Path(strPath);
- try {
- FileSystem fs = FileSystem.get(new URI(strPath), conf);
- FSDataOutputStream out = fs.create(path);
- message.write(out);
- out.close();
-
- FSDataInputStream in = fs.open(path);
- SmallLayeredNeuralNetworkMessage readMessage = new SmallLayeredNeuralNetworkMessage(
- 0, isConverge, null, null);
- readMessage.readFields(in);
- in.close();
-
- assertTrue(readMessage.isConverge());
-
- DoubleMatrix[] readMatrices = readMessage.getCurMatrices();
- assertEquals(2, readMatrices.length);
- for (int i = 0; i < readMatrices.length; ++i) {
- double[][] doubleMatrices = ((DenseDoubleMatrix) readMatrices[i])
- .getValues();
- double[][] doubleExpected = ((DenseDoubleMatrix) matrices[i])
- .getValues();
- for (int r = 0; r < doubleMatrices.length; ++r) {
- assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001);
- }
- }
-
- DoubleMatrix[] readPrevMatrices = readMessage.getPrevMatrices();
- assertEquals(2, readPrevMatrices.length);
- for (int i = 0; i < readPrevMatrices.length; ++i) {
- double[][] doubleMatrices = ((DenseDoubleMatrix) readPrevMatrices[i])
- .getValues();
- double[][] doubleExpected = ((DenseDoubleMatrix) prevMatrices[i])
- .getValues();
- for (int r = 0; r < doubleMatrices.length; ++r) {
- assertArrayEquals(doubleExpected[r], doubleMatrices[r], 0.000001);
- }
- }
-
- // delete
- fs.delete(path, true);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (URISyntaxException e) {
- e.printStackTrace();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/core/MLTestBase.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/core/MLTestBase.java b/src/test/java/org/apache/horn/core/MLTestBase.java
new file mode 100644
index 0000000..3f02600
--- /dev/null
+++ b/src/test/java/org/apache/horn/core/MLTestBase.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The common methods for testing machine learning algorithms
+ *
+ */
+public abstract class MLTestBase {
+
+ /**
+ * Conduct the 0-1 normalization.
+ *
+ * @param instances
+ */
+ protected static void zeroOneNormalization(List<double[]> instanceList,
+ int len) {
+ int dimension = len;
+
+ double[] mins = new double[dimension];
+ double[] maxs = new double[dimension];
+ Arrays.fill(mins, Double.MAX_VALUE);
+ Arrays.fill(maxs, Double.MIN_VALUE);
+
+ for (double[] instance : instanceList) {
+ for (int i = 0; i < len; ++i) {
+ if (mins[i] > instance[i]) {
+ mins[i] = instance[i];
+ }
+ if (maxs[i] < instance[i]) {
+ maxs[i] = instance[i];
+ }
+ }
+ }
+
+ for (double[] instance : instanceList) {
+ for (int i = 0; i < len; ++i) {
+ double range = maxs[i] - mins[i];
+ if (range != 0) {
+ instance[i] = (instance[i] - mins[i]) / range;
+ }
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/core/TestAutoEncoder.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/core/TestAutoEncoder.java b/src/test/java/org/apache/horn/core/TestAutoEncoder.java
new file mode 100644
index 0000000..10ae738
--- /dev/null
+++ b/src/test/java/org/apache/horn/core/TestAutoEncoder.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.horn.core.AutoEncoder;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+/**
+ * Test the functionality of {@link AutoEncoder}.
+ *
+ */
+public class TestAutoEncoder extends MLTestBase {
+
+ @Test
+ public void testAutoEncoderSimple() {
+ double[][] instances = { { 0, 0, 0, 1 }, { 0, 0, 1, 0 }, { 0, 1, 0, 0 },
+ { 0, 0, 0, 0 } };
+ AutoEncoder encoder = new AutoEncoder(4, 2);
+ // TODO use the configuration
+
+ // encoder.setLearningRate(0.5);
+ // encoder.setMomemtumWeight(0.2);
+
+ int maxIteration = 2000;
+ Random rnd = new Random();
+ for (int iteration = 0; iteration < maxIteration; ++iteration) {
+ for (int i = 0; i < instances.length; ++i) {
+ encoder.trainOnline(new DenseDoubleVector(instances[rnd
+ .nextInt(instances.length)]));
+ }
+ }
+
+ for (int i = 0; i < instances.length; ++i) {
+ DoubleVector encodeVec = encoder.encode(new DenseDoubleVector(
+ instances[i]));
+ DoubleVector decodeVec = encoder.decode(encodeVec);
+ for (int d = 0; d < instances[i].length; ++d) {
+ assertEquals(instances[i][d], decodeVec.get(d), 0.1);
+ }
+ }
+
+ }
+
+ @Test
+ public void testAutoEncoderSwissRollDataset() {
+ List<double[]> instanceList = new ArrayList<double[]>();
+ try {
+ BufferedReader br = new BufferedReader(new FileReader(
+ "src/test/resources/dimensional_reduction.txt"));
+ String line = null;
+ while ((line = br.readLine()) != null) {
+ String[] tokens = line.split("\t");
+ double[] instance = new double[tokens.length];
+ for (int i = 0; i < instance.length; ++i) {
+ instance[i] = Double.parseDouble(tokens[i]);
+ }
+ instanceList.add(instance);
+ }
+ br.close();
+ // normalize instances
+ zeroOneNormalization(instanceList, instanceList.get(0).length);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (NumberFormatException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ List<DoubleVector> vecInstanceList = new ArrayList<DoubleVector>();
+ for (double[] instance : instanceList) {
+ vecInstanceList.add(new DenseDoubleVector(instance));
+ }
+ AutoEncoder encoder = new AutoEncoder(3, 2);
+ // encoder.setLearningRate(0.05);
+ // encoder.setMomemtumWeight(0.1);
+ int maxIteration = 2000;
+ for (int iteration = 0; iteration < maxIteration; ++iteration) {
+ for (DoubleVector vector : vecInstanceList) {
+ encoder.trainOnline(vector);
+ }
+ }
+
+ double errorInstance = 0;
+ for (DoubleVector vector : vecInstanceList) {
+ DoubleVector decoded = encoder.getOutput(vector);
+ DoubleVector diff = vector.subtract(decoded);
+ double error = diff.dot(diff);
+ if (error > 0.1) {
+ ++errorInstance;
+ }
+ }
+ Log.info(String.format("Autoecoder error rate: %f%%\n", errorInstance * 100
+ / vecInstanceList.size()));
+
+ }
+
+ @Test
+ public void testAutoEncoderSwissRollDatasetDistributed() {
+ HamaConfiguration conf = new HamaConfiguration();
+ String strDataPath = "/tmp/dimensional_reduction.txt";
+ Path path = new Path(strDataPath);
+ List<double[]> instanceList = new ArrayList<double[]>();
+ try {
+ FileSystem fs = FileSystem.get(new URI(strDataPath), conf);
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+
+ String line = null;
+ BufferedReader br = new BufferedReader(new FileReader(
+ "src/test/resources/dimensional_reduction.txt"));
+ while ((line = br.readLine()) != null) {
+ String[] tokens = line.split("\t");
+ double[] instance = new double[tokens.length];
+ for (int i = 0; i < instance.length; ++i) {
+ instance[i] = Double.parseDouble(tokens[i]);
+ }
+ instanceList.add(instance);
+ }
+ br.close();
+ // normalize instances
+ zeroOneNormalization(instanceList, instanceList.get(0).length);
+
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
+ LongWritable.class, VectorWritable.class);
+ for (int i = 0; i < instanceList.size(); ++i) {
+ DoubleVector vector = new DenseDoubleVector(instanceList.get(i));
+ writer.append(new LongWritable(i), new VectorWritable(vector));
+ }
+
+ writer.close();
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+
+ AutoEncoder encoder = new AutoEncoder(3, 2);
+ String modelPath = "/tmp/autoencoder-modelpath";
+ encoder.setModelPath(modelPath);
+ Map<String, String> trainingParams = new HashMap<String, String>();
+ // encoder.setLearningRate(0.5);
+ trainingParams.put("tasks", "5");
+ trainingParams.put("training.max.iterations", "3000");
+ trainingParams.put("training.batch.size", "200");
+ // encoder.train(conf, path, trainingParams);
+
+ double errorInstance = 0;
+ for (double[] instance : instanceList) {
+ DoubleVector vector = new DenseDoubleVector(instance);
+ DoubleVector decoded = encoder.getOutput(vector);
+ DoubleVector diff = vector.subtract(decoded);
+ double error = diff.dot(diff);
+ if (error > 0.1) {
+ ++errorInstance;
+ }
+ }
+ Log.info(String.format("Autoecoder error rate: %f%%\n", errorInstance * 100
+ / instanceList.size()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-horn/blob/ac8eaf8e/src/test/java/org/apache/horn/core/TestNeuron.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/horn/core/TestNeuron.java b/src/test/java/org/apache/horn/core/TestNeuron.java
new file mode 100644
index 0000000..f2fe4e1
--- /dev/null
+++ b/src/test/java/org/apache/horn/core/TestNeuron.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.horn.core;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.horn.core.Neuron;
+import org.apache.horn.core.Synapse;
+import org.apache.horn.funcs.Sigmoid;
+
+public class TestNeuron extends TestCase {
+ private static double learningRate = 0.1;
+ private static double bias = -1;
+ private static double theta = 0.8;
+
+ public static class MyNeuron extends
+ Neuron<Synapse<DoubleWritable, DoubleWritable>> {
+
+ @Override
+ public void setup(HamaConfiguration conf) {
+ }
+
+ @Override
+ public void forward(
+ Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
+ throws IOException {
+ double sum = 0;
+ for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
+ sum += m.getInput() * m.getWeight();
+ }
+ sum += (bias * theta);
+ this.feedforward(new Sigmoid().apply(sum));
+ }
+
+ @Override
+ public void backward(
+ Iterable<Synapse<DoubleWritable, DoubleWritable>> messages)
+ throws IOException {
+ for (Synapse<DoubleWritable, DoubleWritable> m : messages) {
+ // Calculates error gradient for each neuron
+ double gradient = new Sigmoid().applyDerivative(this.getOutput()) * (m.getDelta() * m.getWeight());
+
+ // Propagates to lower layer
+ backpropagate(gradient);
+
+ // Weight corrections
+ double weight = learningRate * this.getOutput() * m.getDelta();
+ this.push(weight);
+ }
+ }
+
+ }
+
+ public void testProp() throws IOException {
+ List<Synapse<DoubleWritable, DoubleWritable>> x = new ArrayList<Synapse<DoubleWritable, DoubleWritable>>();
+ x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
+ 1.0), new DoubleWritable(0.5)));
+ x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
+ 1.0), new DoubleWritable(0.4)));
+
+ MyNeuron n = new MyNeuron();
+ n.forward(x);
+ assertEquals(0.5249791874789399, n.getOutput());
+
+ x.clear();
+ x.add(new Synapse<DoubleWritable, DoubleWritable>(new DoubleWritable(
+ -0.1274), new DoubleWritable(-1.2)));
+ n.backward(x);
+ assertEquals(-0.006688234848481696, n.getUpdate());
+ }
+
+}