You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by al...@apache.org on 2016/09/22 23:18:16 UTC

[3/4] gora git commit: adding Alfonso Nishikawa reviews and test cases

adding Alfonso Nishikawa reviews and test cases


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/28e10111
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/28e10111
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/28e10111

Branch: refs/heads/master
Commit: 28e1011120468f183d7f1fb6371285946420a34b
Parents: 7063770
Author: Kevin <dj...@yahoo.com>
Authored: Mon Sep 19 12:07:02 2016 +0530
Committer: Kevin <dj...@yahoo.com>
Committed: Mon Sep 19 12:07:02 2016 +0530

----------------------------------------------------------------------
 .../mapreduce/MapReduceSerialization.java       | 51 ++++++++++----------
 .../gora/mapreduce/MapReduceTestUtils.java      | 44 +++++++++++++++++
 2 files changed, 69 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/28e10111/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java
----------------------------------------------------------------------
diff --git a/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java b/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java
index fd5c062..a8dcf9e 100644
--- a/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java
+++ b/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java
@@ -43,7 +43,6 @@ public class MapReduceSerialization extends Configured implements Tool {
   private static final Logger LOG = LoggerFactory.getLogger(MapReduceSerialization.class);
 
   public MapReduceSerialization() {
-
   }
 
   public MapReduceSerialization(Configuration conf) {
@@ -56,13 +55,12 @@ public class MapReduceSerialization extends Configured implements Tool {
    */
   public static class CheckDirtyBitsSerializationMapper
           extends GoraMapper<String, WebPage, Text, WebPage> {
-
     @Override
     protected void map(String key, WebPage page, Context context)
-            throws IOException ,InterruptedException {
-      page.setUrl("hola") ;
-      context.write(new Text(key), page) ;
-    };
+            throws IOException, InterruptedException {
+      page.setUrl("hola");
+      context.write(new Text(key), page);
+    }
   }
 
   /**
@@ -71,29 +69,29 @@ public class MapReduceSerialization extends Configured implements Tool {
    */
   public static class CheckDirtyBytesSerializationReducer extends GoraReducer<Text, WebPage,
           String, WebPage> {
-
     @Override
     protected void reduce(Text key, Iterable<WebPage> values, Context context)
-            throws IOException ,InterruptedException {
+            throws IOException, InterruptedException {
       for (WebPage val : values) {
-        LOG.info(key.toString()) ;
-        LOG.info(val.toString()) ;
-        LOG.info(String.valueOf(val.isDirty())) ;
+        LOG.info(key.toString());
+        LOG.info(val.toString());
+        LOG.info(String.valueOf(val.isDirty()));
         context.write(key.toString(), val);
       }
-    };
-
+    }
   }
 
   /**
    * Creates and returns the {@link Job} for submitting to Hadoop mapreduce.
-   * @param inStore
-   * @param query
-   * @return
+   *
+   * @param inStore  input store on MR jobs runs on
+   * @param query    query to select input set run MR
+   * @param outStore output store which stores results of MR jobs
+   * @return job MR job definition
    * @throws IOException
    */
-  public Job createJob(DataStore<String,WebPage> inStore, Query<String,WebPage> query
-          , DataStore<String,WebPage> outStore) throws IOException {
+  public Job createJob(DataStore<String, WebPage> inStore, Query<String, WebPage> query
+          , DataStore<String, WebPage> outStore) throws IOException {
     Job job = new Job(getConf());
 
     job.setJobName("Check serialization of dirty bits");
@@ -118,10 +116,11 @@ public class MapReduceSerialization extends Configured implements Tool {
     return job;
   }
 
-  public int mapReduceSerialization(DataStore<String,WebPage> inStore,
-                                    DataStore<String, WebPage> outStore) throws IOException, InterruptedException, ClassNotFoundException {
-    Query<String,WebPage> query = inStore.newQuery();
-    query.setFields("url") ;
+  public int mapReduceSerialization(DataStore<String, WebPage> inStore,
+                                    DataStore<String, WebPage> outStore)
+          throws IOException, InterruptedException, ClassNotFoundException {
+    Query<String, WebPage> query = inStore.newQuery();
+    query.setFields("url");
 
     Job job = createJob(inStore, query, outStore);
     return job.waitForCompletion(true) ? 0 : 1;
@@ -130,14 +129,14 @@ public class MapReduceSerialization extends Configured implements Tool {
   @Override
   public int run(String[] args) throws Exception {
 
-    DataStore<String,WebPage> inStore;
-    DataStore<String,WebPage> outStore;
+    DataStore<String, WebPage> inStore;
+    DataStore<String, WebPage> outStore;
     Configuration conf = new Configuration();
-    if(args.length > 0) {
+    if (args.length > 0) {
       String dataStoreClass = args[0];
       inStore = DataStoreFactory.getDataStore(dataStoreClass,
               String.class, WebPage.class, conf);
-      if(args.length > 1) {
+      if (args.length > 1) {
         dataStoreClass = args[1];
       }
       outStore = DataStoreFactory.getDataStore(dataStoreClass,

http://git-wip-us.apache.org/repos/asf/gora/blob/28e10111/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
----------------------------------------------------------------------
diff --git a/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java b/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
index f7a44f0..91ae1bc 100644
--- a/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
+++ b/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
@@ -18,6 +18,9 @@
 
 package org.apache.gora.mapreduce;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -26,10 +29,12 @@ import org.apache.avro.Schema.Field;
 import org.apache.gora.examples.WebPageDataCreator;
 import org.apache.gora.examples.generated.TokenDatum;
 import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.examples.mapreduce.MapReduceSerialization;
 import org.apache.gora.examples.mapreduce.QueryCounter;
 import org.apache.gora.examples.mapreduce.WordCount;
 import org.apache.gora.examples.spark.SparkWordCount;
 import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.impl.DataStoreBase;
 import org.apache.hadoop.conf.Configuration;
@@ -141,4 +146,43 @@ public class MapReduceTestUtils {
     assertNotNull("token:" + token + " cannot be found in datastore", datum);
     assertEquals("count for token:" + token + " is wrong", count, datum.getCount().intValue());
   }
+
+  public static void testMapReduceSerialization(Configuration conf, DataStore<String, WebPage> inStore, DataStore<String,
+          WebPage> outStore) throws Exception {
+    //Datastore now has to be a Hadoop based datastore
+    ((DataStoreBase<String, WebPage>) inStore).setConf(conf);
+    ((DataStoreBase<String, WebPage>) outStore).setConf(conf);
+
+    //create input
+    WebPage page = WebPage.newBuilder().build();
+    page.setUrl("TestURL");
+    List<CharSequence> content = new ArrayList<CharSequence>();
+    content.add("parsed1");
+    content.add("parsed2");
+    page.setParsedContent(content);
+    page.setContent(ByteBuffer.wrap("content".getBytes(Charset.defaultCharset())));
+    inStore.put("key1", page);
+    inStore.flush();
+
+    // expected
+    WebPage expectedPage = WebPage.newBuilder().build();
+    expectedPage.setUrl("hola");
+    List<CharSequence> expectedContent = new ArrayList<CharSequence>();
+    expectedContent.add("parsed1");
+    expectedContent.add("parsed2");
+    expectedPage.setParsedContent(expectedContent);
+    expectedPage.setContent(ByteBuffer.wrap("content".getBytes(Charset.defaultCharset())));
+
+    //run the job
+    MapReduceSerialization mapReduceSerialization = new MapReduceSerialization(conf);
+    mapReduceSerialization.mapReduceSerialization(inStore, outStore);
+
+    Query<String, WebPage> outputQuery = outStore.newQuery();
+    Result<String, WebPage> serializationResult = outStore.execute(outputQuery);
+
+    while (serializationResult.next()) {
+      assertEquals(expectedPage, serializationResult.get());
+    }
+  }
+
 }