You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by mi...@apache.org on 2013/12/11 15:36:19 UTC
svn commit: r1550154 - in /hama/trunk: CHANGES.txt c++/pom.xml core/pom.xml
core/src/test/java/org/apache/hama/pipes/
core/src/test/java/org/apache/hama/pipes/TestPipes.java
Author: millecker
Date: Wed Dec 11 14:36:18 2013
New Revision: 1550154
URL: http://svn.apache.org/r1550154
Log:
HAMA-808: Hama Pipes Testcase
Added:
hama/trunk/core/src/test/java/org/apache/hama/pipes/
hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/c++/pom.xml
hama/trunk/core/pom.xml
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1550154&r1=1550153&r2=1550154&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Dec 11 14:36:18 2013
@@ -16,6 +16,7 @@ Release 0.7.0 (unreleased changes)
IMPROVEMENTS
+ HAMA-808: Hama Pipes Testcase (Martin Illecker)
HAMA-828: Improve code, fix typo and modify unclear comment in org.apache.hama.ml.ann package (Yexi Jiang)
HAMA-699: Add commons module (Martin Illecker)
HAMA-818: Remove useless comments in GroomServer (edwardyoon)
Modified: hama/trunk/c++/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pom.xml?rev=1550154&r1=1550153&r2=1550154&view=diff
==============================================================================
--- hama/trunk/c++/pom.xml (original)
+++ hama/trunk/c++/pom.xml Wed Dec 11 14:36:18 2013
@@ -42,6 +42,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
<executions>
<execution>
<id>make</id>
@@ -61,6 +62,7 @@
</or>
<then>
<mkdir dir="${project.build.directory}/native" />
+ <property name="ant.hama.pipes.examples.install" value="${project.build.directory}/native" />
<exec executable="cmake" dir="${project.build.directory}/native" failonerror="true">
<arg line="${basedir}/src/ -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}" />
</exec>
@@ -76,6 +78,7 @@
</else>
</if>
</target>
+ <exportAntProperties>true</exportAntProperties>
</configuration>
</execution>
<!-- TODO wire here native testcases
@@ -99,6 +102,22 @@
</dependency>
</dependencies>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.groovy.maven</groupId>
+ <artifactId>gmaven-plugin</artifactId>
+ <version>1.0</version>
+ <executions>
+ <execution>
+ <phase>process-test-resources</phase>
+ <goals><goal>execute</goal></goals>
+ <configuration>
+ <source>
+ System.setProperty("hama.pipes.examples.install", project.properties['ant.hama.pipes.examples.install']);
+ </source>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</profile>
Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1550154&r1=1550153&r2=1550154&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Wed Dec 11 14:36:18 2013
@@ -225,6 +225,31 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.groovy.maven</groupId>
+ <artifactId>gmaven-plugin</artifactId>
+ <version>1.0</version>
+ <executions>
+ <execution>
+ <phase>process-test-resources</phase>
+ <goals><goal>execute</goal></goals>
+ <configuration>
+ <source>
+ project.properties.setProperty('hama.pipes.examples.install', System.getProperty("hama.pipes.examples.install"));
+ </source>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <hama.pipes.examples.install>${hama.pipes.examples.install}</hama.pipes.examples.install>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
</plugins>
<finalName>hama-core-${project.version}</finalName>
Added: hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java?rev=1550154&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java Wed Dec 11 14:36:18 2013
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.FileInputFormat;
+import org.apache.hama.bsp.FileOutputFormat;
+import org.apache.hama.bsp.KeyValueTextInputFormat;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.queue.DiskQueue;
+import org.apache.hama.commons.io.PipesKeyValueWritable;
+import org.apache.hama.commons.io.PipesVectorWritable;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleVector;
+import org.junit.Test;
+
+/**
+ * Test case for {@link PipesBSP}
+ *
+ */
+public class TestPipes extends HamaCluster {
+ private static final Log LOG = LogFactory.getLog(TestPipes.class);
+
+ public static final String EXAMPLES_INSTALL_PROPERTY = "hama.pipes.examples.install";
+ public static final String EXAMPLE_SUMMATION_EXEC = "/examples/summation";
+ public static final String EXAMPLE_MATRIXMULTIPLICATION_EXEC = "/examples/matrixmultiplication";
+ public static final String EXAMPLE_TMP_OUTPUT = "/tmp/test-example/";
+ public static final String HAMA_TMP_OUTPUT = "/tmp/hama-test/";
+ public static final String HAMA_TMP_DISK_QUEUE_OUTPUT = "/tmp/messageQueue";
+ public static final int DOUBLE_PRECISION = 6;
+
+ protected HamaConfiguration configuration;
+
+ public TestPipes() {
+ configuration = new HamaConfiguration();
+
+ try {
+ // Cleanup temp Hama locations
+ FileSystem fs = FileSystem.get(configuration);
+ cleanup(fs, new Path(HAMA_TMP_OUTPUT));
+ cleanup(fs, new Path(HAMA_TMP_DISK_QUEUE_OUTPUT));
+ // Remove local temp folder
+ cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ configuration.set("bsp.master.address", "localhost");
+ configuration.set("hama.child.redirect.log.console", "true");
+ assertEquals("Make sure master addr is set to localhost:", "localhost",
+ configuration.get("bsp.master.address"));
+ configuration.set("bsp.local.dir", HAMA_TMP_OUTPUT);
+ configuration
+ .set(DiskQueue.DISK_QUEUE_PATH_KEY, HAMA_TMP_DISK_QUEUE_OUTPUT);
+ configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+ configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+ configuration.set("hama.sync.client.class",
+ org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+ .getCanonicalName());
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ @Test
+ public void testPipes() throws Exception {
+
+ assertNotNull("System property " + EXAMPLES_INSTALL_PROPERTY
+ + " is not defined!", System.getProperty(EXAMPLES_INSTALL_PROPERTY));
+
+ if (System.getProperty(EXAMPLES_INSTALL_PROPERTY).isEmpty()) {
+ LOG.error("System property " + EXAMPLES_INSTALL_PROPERTY
+ + " is empty! Skipping TestPipes!");
+ return;
+ }
+
+ LOG.info(EXAMPLES_INSTALL_PROPERTY + " is defined: '"
+ + System.getProperty(EXAMPLES_INSTALL_PROPERTY) + "'");
+
+ // Setup Paths
+ String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY);
+ Path summationExec = new Path(examplesInstallPath + EXAMPLE_SUMMATION_EXEC);
+ Path matrixmultiplicationExec = new Path(examplesInstallPath
+ + EXAMPLE_MATRIXMULTIPLICATION_EXEC);
+ Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "testing/in");
+ Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "testing/out");
+
+ FileSystem fs = FileSystem.get(configuration);
+
+ // *** Summation Test ***
+ // Generate Summation input
+ BigDecimal sum = writeSummationInputFile(fs, inputPath);
+
+ // Run Summation example
+ runProgram(getSummationJob(configuration), summationExec, inputPath,
+ outputPath, 3, this.numOfGroom);
+
+ // Verify output
+ verifySummationOutput(configuration, outputPath, sum);
+
+ // Clean input and output folder
+ cleanup(fs, inputPath);
+ cleanup(fs, outputPath);
+
+ // *** MatrixMultiplication Test ***
+ // Generate matrix dimensions
+ Random rand = new Random();
+ // (0-19) + 11 -> between 11-30
+ int rows = rand.nextInt(20) + 11;
+ int cols = rand.nextInt(20) + 11;
+
+ // Generate MatrixMultiplication input
+ double[][] matrixA = createRandomMatrix(rows, cols, rand);
+ double[][] matrixB = createRandomMatrix(cols, rows, rand);
+
+ Path matrixAPath = writeMatrix(configuration, matrixA, new Path(inputPath,
+ "matrixA.seq"), false);
+ Path transposedMatrixBPath = writeMatrix(configuration, matrixB, new Path(
+ inputPath, "transposedMatrixB.seq"), true);
+
+ // Run MatrixMultiplication example
+ runProgram(
+ getMatrixMultiplicationJob(configuration, transposedMatrixBPath),
+ matrixmultiplicationExec, matrixAPath, outputPath, 3, this.numOfGroom);
+
+ // Verify output
+ double[][] matrixC = multiplyMatrix(matrixA, matrixB);
+ verifyMatrixMultiplicationOutput(configuration, outputPath, matrixC);
+
+ // Remove local temp folder
+ cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT));
+ }
+
+ static BSPJob getSummationJob(HamaConfiguration conf) throws IOException {
+ BSPJob bsp = new BSPJob(conf);
+ bsp.setInputFormat(KeyValueTextInputFormat.class);
+ bsp.setInputKeyClass(Text.class);
+ bsp.setInputValueClass(Text.class);
+ bsp.setOutputFormat(SequenceFileOutputFormat.class);
+ bsp.setOutputKeyClass(Text.class);
+ bsp.setOutputValueClass(DoubleWritable.class);
+ bsp.set("bsp.message.class", DoubleWritable.class.getName());
+ return bsp;
+ }
+
+ static BSPJob getMatrixMultiplicationJob(HamaConfiguration conf,
+ Path transposedMatrixB) throws IOException {
+ BSPJob bsp = new BSPJob(conf);
+ bsp.setInputFormat(SequenceFileInputFormat.class);
+ bsp.setInputKeyClass(IntWritable.class);
+ bsp.setInputValueClass(PipesVectorWritable.class);
+ bsp.setOutputFormat(SequenceFileOutputFormat.class);
+ bsp.setOutputKeyClass(IntWritable.class);
+ bsp.setOutputValueClass(PipesVectorWritable.class);
+ bsp.set("bsp.message.class", PipesKeyValueWritable.class.getName());
+ bsp.setPartitioner(PipesPartitioner.class);
+ // sort sent messages
+ bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+ "org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol");
+ bsp.set("hama.mat.mult.B.path", transposedMatrixB.toString());
+ return bsp;
+ }
+
+ static BigDecimal writeSummationInputFile(FileSystem fs, Path dir)
+ throws IOException {
+ DataOutputStream out = fs.create(new Path(dir, "part0"));
+ Random rand = new Random();
+ double rangeMin = 0;
+ double rangeMax = 100;
+ BigDecimal sum = new BigDecimal(0);
+ // loop between 50 and 149 times
+ for (int i = 0; i < rand.nextInt(100) + 50; i++) {
+ // generate key value pair inputs
+ double randomValue = rangeMin + (rangeMax - rangeMin) * rand.nextDouble();
+ String truncatedValue = new BigDecimal(randomValue).setScale(
+ DOUBLE_PRECISION, BigDecimal.ROUND_DOWN).toString();
+
+ String line = "key" + (i + 1) + "\t" + truncatedValue + "\n";
+ out.writeBytes(line);
+
+ sum = sum.add(new BigDecimal(truncatedValue));
+ LOG.info("input[" + i + "]: '" + line + "' sum: " + sum.toString());
+ }
+ out.close();
+ return sum;
+ }
+
+ static double[][] createRandomMatrix(int rows, int columns, Random rand) {
+ LOG.info("createRandomMatrix rows: " + rows + " cols: " + columns);
+ final double[][] matrix = new double[rows][columns];
+ double rangeMin = 0;
+ double rangeMax = 100;
+
+ for (int i = 0; i < rows; i++) {
+ for (int j = 0; j < columns; j++) {
+ double randomValue = rangeMin + (rangeMax - rangeMin)
+ * rand.nextDouble();
+ matrix[i][j] = new BigDecimal(randomValue).setScale(DOUBLE_PRECISION,
+ BigDecimal.ROUND_DOWN).doubleValue();
+ // matrix[i][j] = rand.nextInt(9) + 1;
+ }
+ }
+ return matrix;
+ }
+
+ static Path writeMatrix(Configuration conf, double[][] matrix, Path path,
+ boolean saveTransposed) {
+ // Write matrix to DFS
+ SequenceFile.Writer writer = null;
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class,
+ PipesVectorWritable.class);
+
+ // Transpose Matrix before saving
+ if (saveTransposed) {
+ int rows = matrix.length;
+ int columns = matrix[0].length;
+ double[][] transposed = new double[columns][rows];
+ for (int i = 0; i < rows; i++) {
+ for (int j = 0; j < columns; j++) {
+ transposed[j][i] = matrix[i][j];
+ }
+ }
+ matrix = transposed;
+ }
+
+ LOG.info("writeRandomDistributedRowMatrix path: " + path
+ + " saveTransposed: " + saveTransposed);
+ for (int i = 0; i < matrix.length; i++) {
+ DenseDoubleVector rowVector = new DenseDoubleVector(matrix[i]);
+ writer.append(new IntWritable(i), new PipesVectorWritable(rowVector));
+ LOG.info("IntWritable: " + i + " PipesVectorWritable: "
+ + rowVector.toString());
+ }
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ return path;
+ }
+
+ static double[][] multiplyMatrix(double[][] matrixA, double[][] matrixB) {
+ final double[][] matrixC = new double[matrixA.length][matrixB[0].length];
+ int m = matrixA.length;
+ int n = matrixA[0].length;
+ int p = matrixB[0].length;
+
+ for (int k = 0; k < n; k++) {
+ for (int i = 0; i < m; i++) {
+ for (int j = 0; j < p; j++) {
+ matrixC[i][j] = matrixC[i][j] + matrixA[i][k] * matrixB[k][j];
+ }
+ }
+ }
+ return matrixC;
+ }
+
+ static void verifyOutput(HamaConfiguration conf, Path outputPath,
+ String[] expectedResults) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] listStatus = fs.listStatus(outputPath);
+ for (FileStatus status : listStatus) {
+ if (!status.isDir()) {
+ if (status.getLen() > 0) {
+ LOG.info("Output File: " + status.getPath());
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ fs.open(status.getPath())));
+ try {
+ String line = "";
+ int i = 0;
+ while ((line = br.readLine()) != null) {
+ LOG.info("output[" + i + "]: '" + line + "'");
+ LOG.info("expected[" + i + "]: '" + expectedResults[i] + "'");
+ assertEquals("'" + expectedResults[i] + "' != '" + line + "'",
+ expectedResults[i], line);
+ i++;
+ }
+ } finally {
+ br.close();
+ }
+ }
+ }
+ }
+ }
+
+ static void verifySummationOutput(HamaConfiguration conf, Path outputPath,
+ BigDecimal sum) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] listStatus = fs.listStatus(outputPath);
+ for (FileStatus status : listStatus) {
+ if (!status.isDir()) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+ status.getPath(), conf);
+ Text key = new Text();
+ DoubleWritable value = new DoubleWritable();
+ if (reader.next(key, value)) {
+ LOG.info("Output File: " + status.getPath());
+ LOG.info("key: '" + key + "' value: '" + value + "' expected: '"
+ + sum.doubleValue() + "'");
+ assertEquals("Expected value: '" + sum + "' != '" + value + "'",
+ sum.doubleValue(), value.get(),
+ Math.pow(10, (DOUBLE_PRECISION * -1)));
+ }
+ reader.close();
+ }
+ }
+ }
+
+ static void verifyMatrixMultiplicationOutput(HamaConfiguration conf,
+ Path outputPath, double[][] matrix) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] listStatus = fs.listStatus(outputPath);
+ for (FileStatus status : listStatus) {
+ if (!status.isDir()) {
+ LOG.info("Output File: " + status.getPath());
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+ status.getPath(), conf);
+ IntWritable key = new IntWritable();
+ PipesVectorWritable value = new PipesVectorWritable();
+ int rowIdx = 0;
+ while (reader.next(key, value)) {
+
+ assertEquals("Expected rowIdx: '" + rowIdx + "' != '" + key.get()
+ + "'", rowIdx, key.get());
+
+ DoubleVector rowVector = value.getVector();
+ LOG.info("key: " + key.get() + " value: " + rowVector.toString());
+
+ for (int colIdx = 0; colIdx < rowVector.getLength(); colIdx++) {
+
+ double colValue = rowVector.get(colIdx);
+
+ LOG.info("value[" + rowIdx + "," + colIdx + "]: " + colValue
+ + " expected: " + matrix[rowIdx][colIdx]);
+
+ assertEquals("Expected colValue: '" + matrix[rowIdx][colIdx]
+ + "' != '" + colValue + "' in row: " + rowIdx + " values: "
+ + rowVector.toString(), matrix[rowIdx][colIdx], colValue,
+ Math.pow(10, (DOUBLE_PRECISION * -1)));
+ }
+ rowIdx++;
+ }
+ reader.close();
+ }
+ }
+ }
+
+ static void cleanup(FileSystem fs, Path p) throws IOException {
+ fs.delete(p, true);
+ assertFalse(p.getName() + " not cleaned up", fs.exists(p));
+ }
+
+ static void runProgram(BSPJob bsp, Path program, Path inputPath,
+ Path outputPath, int numBspTasks, int numOfGroom) throws IOException {
+
+ HamaConfiguration conf = (HamaConfiguration) bsp.getConfiguration();
+ bsp.setJobName("Test Hama Pipes " + program.getName());
+ bsp.setBspClass(PipesBSP.class);
+
+ FileInputFormat.setInputPaths(bsp, inputPath);
+ FileOutputFormat.setOutputPath(bsp, outputPath);
+
+ Submitter.setIsJavaRecordReader(conf, true);
+ Submitter.setIsJavaRecordWriter(conf, true);
+
+ BSPJobClient jobClient = new BSPJobClient(conf);
+ conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
+
+ // Set bspTaskNum
+ ClusterStatus cluster = jobClient.getClusterStatus(false);
+ assertEquals(numOfGroom, cluster.getGroomServers());
+ bsp.setNumBspTask(numBspTasks);
+
+ // Copy binary to DFS
+ Path testExec = new Path(EXAMPLE_TMP_OUTPUT + "testing/bin/application");
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(testExec.getParent(), true);
+ fs.copyFromLocalFile(program, testExec);
+
+ // Set Executable
+ Submitter.setExecutable(conf, fs.makeQualified(testExec).toString());
+
+ // Run bspJob
+ Submitter.runJob(bsp);
+
+ LOG.info("Client finishes execution job");
+
+ // check output
+ FileStatus[] listStatus = fs.listStatus(outputPath);
+ assertEquals(listStatus.length, numBspTasks);
+ }
+}