You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by al...@apache.org on 2016/09/22 23:18:16 UTC
[3/4] gora git commit: adding Alfonso Nishikawa reviews and test cases
adding Alfonso Nishikawa reviews and test cases
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/28e10111
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/28e10111
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/28e10111
Branch: refs/heads/master
Commit: 28e1011120468f183d7f1fb6371285946420a34b
Parents: 7063770
Author: Kevin <dj...@yahoo.com>
Authored: Mon Sep 19 12:07:02 2016 +0530
Committer: Kevin <dj...@yahoo.com>
Committed: Mon Sep 19 12:07:02 2016 +0530
----------------------------------------------------------------------
.../mapreduce/MapReduceSerialization.java | 51 ++++++++++----------
.../gora/mapreduce/MapReduceTestUtils.java | 44 +++++++++++++++++
2 files changed, 69 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/28e10111/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java
----------------------------------------------------------------------
diff --git a/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java b/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java
index fd5c062..a8dcf9e 100644
--- a/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java
+++ b/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java
@@ -43,7 +43,6 @@ public class MapReduceSerialization extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(MapReduceSerialization.class);
public MapReduceSerialization() {
-
}
public MapReduceSerialization(Configuration conf) {
@@ -56,13 +55,12 @@ public class MapReduceSerialization extends Configured implements Tool {
*/
public static class CheckDirtyBitsSerializationMapper
extends GoraMapper<String, WebPage, Text, WebPage> {
-
@Override
protected void map(String key, WebPage page, Context context)
- throws IOException ,InterruptedException {
- page.setUrl("hola") ;
- context.write(new Text(key), page) ;
- };
+ throws IOException, InterruptedException {
+ page.setUrl("hola");
+ context.write(new Text(key), page);
+ }
}
/**
@@ -71,29 +69,29 @@ public class MapReduceSerialization extends Configured implements Tool {
*/
public static class CheckDirtyBytesSerializationReducer extends GoraReducer<Text, WebPage,
String, WebPage> {
-
@Override
protected void reduce(Text key, Iterable<WebPage> values, Context context)
- throws IOException ,InterruptedException {
+ throws IOException, InterruptedException {
for (WebPage val : values) {
- LOG.info(key.toString()) ;
- LOG.info(val.toString()) ;
- LOG.info(String.valueOf(val.isDirty())) ;
+ LOG.info(key.toString());
+ LOG.info(val.toString());
+ LOG.info(String.valueOf(val.isDirty()));
context.write(key.toString(), val);
}
- };
-
+ }
}
/**
* Creates and returns the {@link Job} for submitting to Hadoop mapreduce.
- * @param inStore
- * @param query
- * @return
+ *
+ * @param inStore input store on MR jobs runs on
+ * @param query query to select input set run MR
+ * @param outStore output store which stores results of MR jobs
+ * @return job MR job definition
* @throws IOException
*/
- public Job createJob(DataStore<String,WebPage> inStore, Query<String,WebPage> query
- , DataStore<String,WebPage> outStore) throws IOException {
+ public Job createJob(DataStore<String, WebPage> inStore, Query<String, WebPage> query
+ , DataStore<String, WebPage> outStore) throws IOException {
Job job = new Job(getConf());
job.setJobName("Check serialization of dirty bits");
@@ -118,10 +116,11 @@ public class MapReduceSerialization extends Configured implements Tool {
return job;
}
- public int mapReduceSerialization(DataStore<String,WebPage> inStore,
- DataStore<String, WebPage> outStore) throws IOException, InterruptedException, ClassNotFoundException {
- Query<String,WebPage> query = inStore.newQuery();
- query.setFields("url") ;
+ public int mapReduceSerialization(DataStore<String, WebPage> inStore,
+ DataStore<String, WebPage> outStore)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Query<String, WebPage> query = inStore.newQuery();
+ query.setFields("url");
Job job = createJob(inStore, query, outStore);
return job.waitForCompletion(true) ? 0 : 1;
@@ -130,14 +129,14 @@ public class MapReduceSerialization extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
- DataStore<String,WebPage> inStore;
- DataStore<String,WebPage> outStore;
+ DataStore<String, WebPage> inStore;
+ DataStore<String, WebPage> outStore;
Configuration conf = new Configuration();
- if(args.length > 0) {
+ if (args.length > 0) {
String dataStoreClass = args[0];
inStore = DataStoreFactory.getDataStore(dataStoreClass,
String.class, WebPage.class, conf);
- if(args.length > 1) {
+ if (args.length > 1) {
dataStoreClass = args[1];
}
outStore = DataStoreFactory.getDataStore(dataStoreClass,
http://git-wip-us.apache.org/repos/asf/gora/blob/28e10111/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
----------------------------------------------------------------------
diff --git a/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java b/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
index f7a44f0..91ae1bc 100644
--- a/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
+++ b/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
@@ -18,6 +18,9 @@
package org.apache.gora.mapreduce;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -26,10 +29,12 @@ import org.apache.avro.Schema.Field;
import org.apache.gora.examples.WebPageDataCreator;
import org.apache.gora.examples.generated.TokenDatum;
import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.examples.mapreduce.MapReduceSerialization;
import org.apache.gora.examples.mapreduce.QueryCounter;
import org.apache.gora.examples.mapreduce.WordCount;
import org.apache.gora.examples.spark.SparkWordCount;
import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.impl.DataStoreBase;
import org.apache.hadoop.conf.Configuration;
@@ -141,4 +146,43 @@ public class MapReduceTestUtils {
assertNotNull("token:" + token + " cannot be found in datastore", datum);
assertEquals("count for token:" + token + " is wrong", count, datum.getCount().intValue());
}
+
+ public static void testMapReduceSerialization(Configuration conf, DataStore<String, WebPage> inStore, DataStore<String,
+ WebPage> outStore) throws Exception {
+ //Datastore now has to be a Hadoop based datastore
+ ((DataStoreBase<String, WebPage>) inStore).setConf(conf);
+ ((DataStoreBase<String, WebPage>) outStore).setConf(conf);
+
+ //create input
+ WebPage page = WebPage.newBuilder().build();
+ page.setUrl("TestURL");
+ List<CharSequence> content = new ArrayList<CharSequence>();
+ content.add("parsed1");
+ content.add("parsed2");
+ page.setParsedContent(content);
+ page.setContent(ByteBuffer.wrap("content".getBytes(Charset.defaultCharset())));
+ inStore.put("key1", page);
+ inStore.flush();
+
+ // expected
+ WebPage expectedPage = WebPage.newBuilder().build();
+ expectedPage.setUrl("hola");
+ List<CharSequence> expectedContent = new ArrayList<CharSequence>();
+ expectedContent.add("parsed1");
+ expectedContent.add("parsed2");
+ expectedPage.setParsedContent(expectedContent);
+ expectedPage.setContent(ByteBuffer.wrap("content".getBytes(Charset.defaultCharset())));
+
+ //run the job
+ MapReduceSerialization mapReduceSerialization = new MapReduceSerialization(conf);
+ mapReduceSerialization.mapReduceSerialization(inStore, outStore);
+
+ Query<String, WebPage> outputQuery = outStore.newQuery();
+ Result<String, WebPage> serializationResult = outStore.execute(outputQuery);
+
+ while (serializationResult.next()) {
+ assertEquals(expectedPage, serializationResult.get());
+ }
+ }
+
}