You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by mi...@apache.org on 2013/12/11 15:36:19 UTC

svn commit: r1550154 - in /hama/trunk: CHANGES.txt c++/pom.xml core/pom.xml core/src/test/java/org/apache/hama/pipes/ core/src/test/java/org/apache/hama/pipes/TestPipes.java

Author: millecker
Date: Wed Dec 11 14:36:18 2013
New Revision: 1550154

URL: http://svn.apache.org/r1550154
Log:
HAMA-808: Hama Pipes Testcase

Added:
    hama/trunk/core/src/test/java/org/apache/hama/pipes/
    hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/c++/pom.xml
    hama/trunk/core/pom.xml

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1550154&r1=1550153&r2=1550154&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Dec 11 14:36:18 2013
@@ -16,6 +16,7 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-808: Hama Pipes Testcase (Martin Illecker)
    HAMA-828: Improve code, fix typo and modify unclear comment in org.apache.hama.ml.ann package (Yexi Jiang)
    HAMA-699: Add commons module (Martin Illecker)
    HAMA-818: Remove useless comments in GroomServer (edwardyoon)

Modified: hama/trunk/c++/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pom.xml?rev=1550154&r1=1550153&r2=1550154&view=diff
==============================================================================
--- hama/trunk/c++/pom.xml (original)
+++ hama/trunk/c++/pom.xml Wed Dec 11 14:36:18 2013
@@ -42,6 +42,7 @@
             <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-antrun-plugin</artifactId>
+              <version>1.7</version>
               <executions>
                 <execution>
                   <id>make</id>
@@ -61,6 +62,7 @@
                         </or>
                         <then>
                           <mkdir dir="${project.build.directory}/native" />
+                          <property name="ant.hama.pipes.examples.install" value="${project.build.directory}/native" />
                           <exec executable="cmake" dir="${project.build.directory}/native" failonerror="true">
                             <arg line="${basedir}/src/ -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}" />
                           </exec>
@@ -76,6 +78,7 @@
                         </else>
                       </if>
                     </target>
+                  <exportAntProperties>true</exportAntProperties>
                   </configuration>
                 </execution>
                 <!-- TODO wire here native testcases
@@ -99,6 +102,22 @@
                 </dependency>
               </dependencies>
             </plugin>
+          <plugin>
+              <groupId>org.codehaus.groovy.maven</groupId>
+              <artifactId>gmaven-plugin</artifactId>
+              <version>1.0</version>
+              <executions>
+                <execution>
+                  <phase>process-test-resources</phase>
+                  <goals><goal>execute</goal></goals>
+                  <configuration>
+                    <source>
+                      System.setProperty("hama.pipes.examples.install", project.properties['ant.hama.pipes.examples.install']);
+                    </source>
+                  </configuration>
+                </execution>
+              </executions>
+            </plugin>
           </plugins>
         </build>
       </profile>

Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1550154&r1=1550153&r2=1550154&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Wed Dec 11 14:36:18 2013
@@ -225,6 +225,31 @@
           </execution>
         </executions>
       </plugin>
+    <plugin>
+        <groupId>org.codehaus.groovy.maven</groupId>
+        <artifactId>gmaven-plugin</artifactId>
+        <version>1.0</version>
+        <executions>
+          <execution>
+            <phase>process-test-resources</phase>
+            <goals><goal>execute</goal></goals>
+            <configuration>
+              <source>
+                project.properties.setProperty('hama.pipes.examples.install', System.getProperty("hama.pipes.examples.install"));
+              </source>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <hama.pipes.examples.install>${hama.pipes.examples.install}</hama.pipes.examples.install>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
     </plugins>
     <finalName>hama-core-${project.version}</finalName>
 

Added: hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java?rev=1550154&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java Wed Dec 11 14:36:18 2013
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.FileInputFormat;
+import org.apache.hama.bsp.FileOutputFormat;
+import org.apache.hama.bsp.KeyValueTextInputFormat;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.queue.DiskQueue;
+import org.apache.hama.commons.io.PipesKeyValueWritable;
+import org.apache.hama.commons.io.PipesVectorWritable;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleVector;
+import org.junit.Test;
+
+/**
+ * Test case for {@link PipesBSP}
+ * 
+ */
+public class TestPipes extends HamaCluster {
+  private static final Log LOG = LogFactory.getLog(TestPipes.class);
+
+  public static final String EXAMPLES_INSTALL_PROPERTY = "hama.pipes.examples.install";
+  public static final String EXAMPLE_SUMMATION_EXEC = "/examples/summation";
+  public static final String EXAMPLE_MATRIXMULTIPLICATION_EXEC = "/examples/matrixmultiplication";
+  public static final String EXAMPLE_TMP_OUTPUT = "/tmp/test-example/";
+  public static final String HAMA_TMP_OUTPUT = "/tmp/hama-test/";
+  public static final String HAMA_TMP_DISK_QUEUE_OUTPUT = "/tmp/messageQueue";
+  public static final int DOUBLE_PRECISION = 6;
+
+  protected HamaConfiguration configuration;
+
+  public TestPipes() {
+    configuration = new HamaConfiguration();
+
+    try {
+      // Cleanup temp Hama locations
+      FileSystem fs = FileSystem.get(configuration);
+      cleanup(fs, new Path(HAMA_TMP_OUTPUT));
+      cleanup(fs, new Path(HAMA_TMP_DISK_QUEUE_OUTPUT));
+      // Remove local temp folder
+      cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    configuration.set("bsp.master.address", "localhost");
+    configuration.set("hama.child.redirect.log.console", "true");
+    assertEquals("Make sure master addr is set to localhost:", "localhost",
+        configuration.get("bsp.master.address"));
+    configuration.set("bsp.local.dir", HAMA_TMP_OUTPUT);
+    configuration
+        .set(DiskQueue.DISK_QUEUE_PATH_KEY, HAMA_TMP_DISK_QUEUE_OUTPUT);
+    configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+    configuration.set("hama.sync.client.class",
+        org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+            .getCanonicalName());
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  @Test
+  public void testPipes() throws Exception {
+
+    assertNotNull("System property " + EXAMPLES_INSTALL_PROPERTY
+        + " is not defined!", System.getProperty(EXAMPLES_INSTALL_PROPERTY));
+
+    if (System.getProperty(EXAMPLES_INSTALL_PROPERTY).isEmpty()) {
+      LOG.error("System property " + EXAMPLES_INSTALL_PROPERTY
+          + " is empty! Skipping TestPipes!");
+      return;
+    }
+
+    LOG.info(EXAMPLES_INSTALL_PROPERTY + " is defined: '"
+        + System.getProperty(EXAMPLES_INSTALL_PROPERTY) + "'");
+
+    // Setup Paths
+    String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY);
+    Path summationExec = new Path(examplesInstallPath + EXAMPLE_SUMMATION_EXEC);
+    Path matrixmultiplicationExec = new Path(examplesInstallPath
+        + EXAMPLE_MATRIXMULTIPLICATION_EXEC);
+    Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "testing/in");
+    Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "testing/out");
+
+    FileSystem fs = FileSystem.get(configuration);
+
+    // *** Summation Test ***
+    // Generate Summation input
+    BigDecimal sum = writeSummationInputFile(fs, inputPath);
+
+    // Run Summation example
+    runProgram(getSummationJob(configuration), summationExec, inputPath,
+        outputPath, 3, this.numOfGroom);
+
+    // Verify output
+    verifySummationOutput(configuration, outputPath, sum);
+
+    // Clean input and output folder
+    cleanup(fs, inputPath);
+    cleanup(fs, outputPath);
+
+    // *** MatrixMultiplication Test ***
+    // Generate matrix dimensions
+    Random rand = new Random();
+    // (0-19) + 11 -> between 11-30
+    int rows = rand.nextInt(20) + 11;
+    int cols = rand.nextInt(20) + 11;
+
+    // Generate MatrixMultiplication input
+    double[][] matrixA = createRandomMatrix(rows, cols, rand);
+    double[][] matrixB = createRandomMatrix(cols, rows, rand);
+
+    Path matrixAPath = writeMatrix(configuration, matrixA, new Path(inputPath,
+        "matrixA.seq"), false);
+    Path transposedMatrixBPath = writeMatrix(configuration, matrixB, new Path(
+        inputPath, "transposedMatrixB.seq"), true);
+
+    // Run MatrixMultiplication example
+    runProgram(
+        getMatrixMultiplicationJob(configuration, transposedMatrixBPath),
+        matrixmultiplicationExec, matrixAPath, outputPath, 3, this.numOfGroom);
+
+    // Verify output
+    double[][] matrixC = multiplyMatrix(matrixA, matrixB);
+    verifyMatrixMultiplicationOutput(configuration, outputPath, matrixC);
+
+    // Remove local temp folder
+    cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT));
+  }
+
+  static BSPJob getSummationJob(HamaConfiguration conf) throws IOException {
+    BSPJob bsp = new BSPJob(conf);
+    bsp.setInputFormat(KeyValueTextInputFormat.class);
+    bsp.setInputKeyClass(Text.class);
+    bsp.setInputValueClass(Text.class);
+    bsp.setOutputFormat(SequenceFileOutputFormat.class);
+    bsp.setOutputKeyClass(Text.class);
+    bsp.setOutputValueClass(DoubleWritable.class);
+    bsp.set("bsp.message.class", DoubleWritable.class.getName());
+    return bsp;
+  }
+
+  static BSPJob getMatrixMultiplicationJob(HamaConfiguration conf,
+      Path transposedMatrixB) throws IOException {
+    BSPJob bsp = new BSPJob(conf);
+    bsp.setInputFormat(SequenceFileInputFormat.class);
+    bsp.setInputKeyClass(IntWritable.class);
+    bsp.setInputValueClass(PipesVectorWritable.class);
+    bsp.setOutputFormat(SequenceFileOutputFormat.class);
+    bsp.setOutputKeyClass(IntWritable.class);
+    bsp.setOutputValueClass(PipesVectorWritable.class);
+    bsp.set("bsp.message.class", PipesKeyValueWritable.class.getName());
+    bsp.setPartitioner(PipesPartitioner.class);
+    // sort sent messages
+    bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+        "org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol");
+    bsp.set("hama.mat.mult.B.path", transposedMatrixB.toString());
+    return bsp;
+  }
+
+  static BigDecimal writeSummationInputFile(FileSystem fs, Path dir)
+      throws IOException {
+    DataOutputStream out = fs.create(new Path(dir, "part0"));
+    Random rand = new Random();
+    double rangeMin = 0;
+    double rangeMax = 100;
+    BigDecimal sum = new BigDecimal(0);
+    // loop between 50 and 149 times
+    for (int i = 0; i < rand.nextInt(100) + 50; i++) {
+      // generate key value pair inputs
+      double randomValue = rangeMin + (rangeMax - rangeMin) * rand.nextDouble();
+      String truncatedValue = new BigDecimal(randomValue).setScale(
+          DOUBLE_PRECISION, BigDecimal.ROUND_DOWN).toString();
+
+      String line = "key" + (i + 1) + "\t" + truncatedValue + "\n";
+      out.writeBytes(line);
+
+      sum = sum.add(new BigDecimal(truncatedValue));
+      LOG.info("input[" + i + "]: '" + line + "' sum: " + sum.toString());
+    }
+    out.close();
+    return sum;
+  }
+
+  static double[][] createRandomMatrix(int rows, int columns, Random rand) {
+    LOG.info("createRandomMatrix rows: " + rows + " cols: " + columns);
+    final double[][] matrix = new double[rows][columns];
+    double rangeMin = 0;
+    double rangeMax = 100;
+
+    for (int i = 0; i < rows; i++) {
+      for (int j = 0; j < columns; j++) {
+        double randomValue = rangeMin + (rangeMax - rangeMin)
+            * rand.nextDouble();
+        matrix[i][j] = new BigDecimal(randomValue).setScale(DOUBLE_PRECISION,
+            BigDecimal.ROUND_DOWN).doubleValue();
+        // matrix[i][j] = rand.nextInt(9) + 1;
+      }
+    }
+    return matrix;
+  }
+
+  static Path writeMatrix(Configuration conf, double[][] matrix, Path path,
+      boolean saveTransposed) {
+    // Write matrix to DFS
+    SequenceFile.Writer writer = null;
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class,
+          PipesVectorWritable.class);
+
+      // Transpose Matrix before saving
+      if (saveTransposed) {
+        int rows = matrix.length;
+        int columns = matrix[0].length;
+        double[][] transposed = new double[columns][rows];
+        for (int i = 0; i < rows; i++) {
+          for (int j = 0; j < columns; j++) {
+            transposed[j][i] = matrix[i][j];
+          }
+        }
+        matrix = transposed;
+      }
+
+      LOG.info("writeRandomDistributedRowMatrix path: " + path
+          + " saveTransposed: " + saveTransposed);
+      for (int i = 0; i < matrix.length; i++) {
+        DenseDoubleVector rowVector = new DenseDoubleVector(matrix[i]);
+        writer.append(new IntWritable(i), new PipesVectorWritable(rowVector));
+        LOG.info("IntWritable: " + i + " PipesVectorWritable: "
+            + rowVector.toString());
+      }
+
+    } catch (IOException e) {
+      e.printStackTrace();
+    } finally {
+      if (writer != null) {
+        try {
+          writer.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    return path;
+  }
+
+  static double[][] multiplyMatrix(double[][] matrixA, double[][] matrixB) {
+    final double[][] matrixC = new double[matrixA.length][matrixB[0].length];
+    int m = matrixA.length;
+    int n = matrixA[0].length;
+    int p = matrixB[0].length;
+
+    for (int k = 0; k < n; k++) {
+      for (int i = 0; i < m; i++) {
+        for (int j = 0; j < p; j++) {
+          matrixC[i][j] = matrixC[i][j] + matrixA[i][k] * matrixB[k][j];
+        }
+      }
+    }
+    return matrixC;
+  }
+
+  static void verifyOutput(HamaConfiguration conf, Path outputPath,
+      String[] expectedResults) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] listStatus = fs.listStatus(outputPath);
+    for (FileStatus status : listStatus) {
+      if (!status.isDir()) {
+        if (status.getLen() > 0) {
+          LOG.info("Output File: " + status.getPath());
+          BufferedReader br = new BufferedReader(new InputStreamReader(
+              fs.open(status.getPath())));
+          try {
+            String line = "";
+            int i = 0;
+            while ((line = br.readLine()) != null) {
+              LOG.info("output[" + i + "]: '" + line + "'");
+              LOG.info("expected[" + i + "]: '" + expectedResults[i] + "'");
+              assertEquals("'" + expectedResults[i] + "' != '" + line + "'",
+                  expectedResults[i], line);
+              i++;
+            }
+          } finally {
+            br.close();
+          }
+        }
+      }
+    }
+  }
+
+  static void verifySummationOutput(HamaConfiguration conf, Path outputPath,
+      BigDecimal sum) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] listStatus = fs.listStatus(outputPath);
+    for (FileStatus status : listStatus) {
+      if (!status.isDir()) {
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+            status.getPath(), conf);
+        Text key = new Text();
+        DoubleWritable value = new DoubleWritable();
+        if (reader.next(key, value)) {
+          LOG.info("Output File: " + status.getPath());
+          LOG.info("key: '" + key + "' value: '" + value + "' expected: '"
+              + sum.doubleValue() + "'");
+          assertEquals("Expected value: '" + sum + "' != '" + value + "'",
+              sum.doubleValue(), value.get(),
+              Math.pow(10, (DOUBLE_PRECISION * -1)));
+        }
+        reader.close();
+      }
+    }
+  }
+
+  static void verifyMatrixMultiplicationOutput(HamaConfiguration conf,
+      Path outputPath, double[][] matrix) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] listStatus = fs.listStatus(outputPath);
+    for (FileStatus status : listStatus) {
+      if (!status.isDir()) {
+        LOG.info("Output File: " + status.getPath());
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+            status.getPath(), conf);
+        IntWritable key = new IntWritable();
+        PipesVectorWritable value = new PipesVectorWritable();
+        int rowIdx = 0;
+        while (reader.next(key, value)) {
+
+          assertEquals("Expected rowIdx: '" + rowIdx + "' != '" + key.get()
+              + "'", rowIdx, key.get());
+
+          DoubleVector rowVector = value.getVector();
+          LOG.info("key: " + key.get() + " value: " + rowVector.toString());
+
+          for (int colIdx = 0; colIdx < rowVector.getLength(); colIdx++) {
+
+            double colValue = rowVector.get(colIdx);
+
+            LOG.info("value[" + rowIdx + "," + colIdx + "]: " + colValue
+                + " expected: " + matrix[rowIdx][colIdx]);
+
+            assertEquals("Expected colValue: '" + matrix[rowIdx][colIdx]
+                + "' != '" + colValue + "' in row: " + rowIdx + " values: "
+                + rowVector.toString(), matrix[rowIdx][colIdx], colValue,
+                Math.pow(10, (DOUBLE_PRECISION * -1)));
+          }
+          rowIdx++;
+        }
+        reader.close();
+      }
+    }
+  }
+
+  static void cleanup(FileSystem fs, Path p) throws IOException {
+    fs.delete(p, true);
+    assertFalse(p.getName() + " not cleaned up", fs.exists(p));
+  }
+
+  static void runProgram(BSPJob bsp, Path program, Path inputPath,
+      Path outputPath, int numBspTasks, int numOfGroom) throws IOException {
+
+    HamaConfiguration conf = (HamaConfiguration) bsp.getConfiguration();
+    bsp.setJobName("Test Hama Pipes " + program.getName());
+    bsp.setBspClass(PipesBSP.class);
+
+    FileInputFormat.setInputPaths(bsp, inputPath);
+    FileOutputFormat.setOutputPath(bsp, outputPath);
+
+    Submitter.setIsJavaRecordReader(conf, true);
+    Submitter.setIsJavaRecordWriter(conf, true);
+
+    BSPJobClient jobClient = new BSPJobClient(conf);
+    conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
+
+    // Set bspTaskNum
+    ClusterStatus cluster = jobClient.getClusterStatus(false);
+    assertEquals(numOfGroom, cluster.getGroomServers());
+    bsp.setNumBspTask(numBspTasks);
+
+    // Copy binary to DFS
+    Path testExec = new Path(EXAMPLE_TMP_OUTPUT + "testing/bin/application");
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(testExec.getParent(), true);
+    fs.copyFromLocalFile(program, testExec);
+
+    // Set Executable
+    Submitter.setExecutable(conf, fs.makeQualified(testExec).toString());
+
+    // Run bspJob
+    Submitter.runJob(bsp);
+
+    LOG.info("Client finishes execution job");
+
+    // check output
+    FileStatus[] listStatus = fs.listStatus(outputPath);
+    assertEquals(listStatus.length, numBspTasks);
+  }
+}