You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2012/05/22 00:14:16 UTC
svn commit: r1341230 - in /avro/trunk: ./ lang/csharp/src/apache/test/Schema/
lang/java/mapred/src/main/java/org/apache/avro/mapred/
lang/java/mapred/src/test/java/org/apache/avro/mapred/
Author: cutting
Date: Mon May 21 22:14:15 2012
New Revision: 1341230
URL: http://svn.apache.org/viewvc?rev=1341230&view=rev
Log:
AVRO-1052. Java: Add AvroMultipleOutputFormat, to permit splitting mapreduce output to multiple locations. Contributed by Ashish Nagavaram.
Added:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java (with props)
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java (with props)
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/csharp/src/apache/test/Schema/SchemaNormalizationTests.cs
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1341230&r1=1341229&r2=1341230&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon May 21 22:14:15 2012
@@ -16,6 +16,9 @@ Avro 1.7.0 (unreleased)
AVRO-593. Java: Add support for Hadoop's newer mapreduce API.
(Garrett Wu via cutting)
+ AVRO-1052. Java: Add AvroMultipleOutputFormat, to permit splitting
+ mapreduce output to multiple locations. (Ashish Nagavaram via cutting)
+
IMPROVEMENTS
AVRO-1060. Java: Upgrade Netty to version 3.4.0. (Karthik K via cutting)
Modified: avro/trunk/lang/csharp/src/apache/test/Schema/SchemaNormalizationTests.cs
URL: http://svn.apache.org/viewvc/avro/trunk/lang/csharp/src/apache/test/Schema/SchemaNormalizationTests.cs?rev=1341230&r1=1341229&r2=1341230&view=diff
==============================================================================
--- avro/trunk/lang/csharp/src/apache/test/Schema/SchemaNormalizationTests.cs (original)
+++ avro/trunk/lang/csharp/src/apache/test/Schema/SchemaNormalizationTests.cs Mon May 21 22:14:15 2012
@@ -48,7 +48,7 @@ namespace Avro.Test
Assert.AreEqual(carefulFP, SchemaNormalization.ParsingFingerprint64(s));
}
- private static IEnumerable<object> ProvideFingerprintTestCases()
+ private static List<object[]> ProvideFingerprintTestCases()
{
using (StreamReader reader = new StreamReader("../../../../../share/test/data/schema-tests.txt"))
{
@@ -56,7 +56,7 @@ namespace Avro.Test
}
}
- private static IEnumerable<object> ProvideCanonicalTestCases()
+ private static List<object[]> ProvideCanonicalTestCases()
{
using (StreamReader reader = new StreamReader("../../../../../share/test/data/schema-tests.txt"))
{
Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java?rev=1341230&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java Mon May 21 22:14:15 2012
@@ -0,0 +1,543 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.List;
+import java.util.Set;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Collections;
+
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.avro.Schema;
+
+import org.apache.hadoop.io.NullWritable;
+
+
+
+/**
+ * The AvroMultipleOutputs class simplifies writing Avro output data
+ * to multiple outputs
+ *
+ * <p>
+ * Case one: writing to additional outputs other than the job default output.
+ *
+ * Each additional output, or named output, may be configured with its own
+ * <code>Schema</code> and <code>OutputFormat</code>.
+ * A named output can be a single file or a multi file. The later is refered as
+ * a multi named output which is an unbound set of files all sharing the same
+ * <code>Schema</code>.
+ * </p>
+ * <p>
+ * Case two: to write data to different files provided by user
+ * </p>
+ *
+ * <p>
+ * AvroMultipleOutputs supports counters, by default they are disabled. The
+ * counters group is the {@link AvroMultipleOutputs} class name. The names of the
+ * counters are the same as the output name. These count the number of records
+ * written to each output name. For multi
+ * named outputs the name of the counter is the concatenation of the named
+ * output, and underscore '_' and the multiname.
+ * </p>
+ *
+ * Usage pattern for job submission:
+ * <pre>
+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ * FileOutputFormat.setOutputPath(job, outDir);
+ *
+ * job.setMapperClass(MyAvroMapper.class);
+ * job.setReducerClass(HadoopReducer.class);
+ * job.set("avro.reducer",MyAvroReducer.class);
+ * ...
+ *
+ * Schema schema;
+ * ...
+ * // Defines additional single output 'avro1' for the job
+ * AvroMultipleOutputs.addNamedOutput(job, "avro1", AvroOutputFormat.class,
+ * schema);
+ *
+ * // Defines additional output 'avro2' with different schema for the job
+ * AvroMultipleOutputs.addNamedOutput(job, "avro2",
+ * AvroOutputFormat.class,
+ * null); // if Schema is specified as null then the default output schema is used
+ * ...
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ * <p>
+ * Usage in Reducer:
+ * <pre>
+ *
+ * public class MyAvroReducer extends
+ * AvroReducer<K, V, OUT> {
+ * private MultipleOutputs amos;
+ *
+ *
+ * public void configure(JobConf conf) {
+ * ...
+ * amos = new AvroMultipleOutputs(conf);
+ * }
+ *
+ * public void reduce(K, Iterator<V> values,
+ * AvroCollector<OUT>, Reporter reporter)
+ * throws IOException {
+ * ...
+ * amos.getCollector("avro1", reporter).collect(datum);
+ * amos.getCollector("avro2", "A", reporter).collect(datum);
+ * amos.getCollector("avro3", "B", reporter).collect(datum);
+ * ...
+ * }
+ *
+ * public void close() throws IOException {
+ * amos.close();
+ * ...
+ * }
+ *
+ * }
+ * </pre>
+ */
+
+public class AvroMultipleOutputs {
+
+ private static final String NAMED_OUTPUTS = "mo.namedOutputs";
+
+ private static final String MO_PREFIX = "mo.namedOutput.";
+
+ private static final String FORMAT = ".avro";
+ private static final String MULTI = ".multi";
+
+ private static final String COUNTERS_ENABLED = "mo.counters";
+
+
+ private static Map<String,Schema> schemaList = new HashMap<String,Schema>();
+ /**
+ * Counters group used by the counters of MultipleOutputs.
+ */
+ private static final String COUNTERS_GROUP = AvroMultipleOutputs.class.getName();
+
+ /**
+ * Checks if a named output is alreadyDefined or not.
+ *
+ * @param conf job conf
+ * @param namedOutput named output names
+ * @param alreadyDefined whether the existence/non-existence of
+ * the named output is to be checked
+ * @throws IllegalArgumentException if the output name is alreadyDefined or
+ * not depending on the value of the
+ * 'alreadyDefined' parameter
+ */
+ private static void checkNamedOutput(JobConf conf, String namedOutput,
+ boolean alreadyDefined) {
+ List<String> definedChannels = getNamedOutputsList(conf);
+ if (alreadyDefined && definedChannels.contains(namedOutput)) {
+ throw new IllegalArgumentException("Named output '" + namedOutput +
+ "' already alreadyDefined");
+ } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
+ throw new IllegalArgumentException("Named output '" + namedOutput +
+ "' not defined");
+ }
+ }
+
+ /**
+ * Checks if a named output name is valid token.
+ *
+ * @param namedOutput named output Name
+ * @throws IllegalArgumentException if the output name is not valid.
+ */
+ private static void checkTokenName(String namedOutput) {
+ if (namedOutput == null || namedOutput.length() == 0) {
+ throw new IllegalArgumentException(
+ "Name cannot be NULL or emtpy");
+ }
+ for (char ch : namedOutput.toCharArray()) {
+ if ((ch >= 'A') && (ch <= 'Z')) {
+ continue;
+ }
+ if ((ch >= 'a') && (ch <= 'z')) {
+ continue;
+ }
+ if ((ch >= '0') && (ch <= '9')) {
+ continue;
+ }
+ throw new IllegalArgumentException(
+ "Name cannot be have a '" + ch + "' char");
+ }
+ }
+
+ /**
+ * Checks if a named output name is valid.
+ *
+ * @param namedOutput named output Name
+ * @throws IllegalArgumentException if the output name is not valid.
+ */
+ private static void checkNamedOutputName(String namedOutput) {
+ checkTokenName(namedOutput);
+ // name cannot be the name used for the default output
+ if (namedOutput.equals("part")) {
+ throw new IllegalArgumentException(
+ "Named output name cannot be 'part'");
+ }
+ }
+
+ /**
+ * Returns list of channel names.
+ *
+ * @param conf job conf
+ * @return List of channel Names
+ */
+ public static List<String> getNamedOutputsList(JobConf conf) {
+ List<String> names = new ArrayList<String>();
+ StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " ");
+ while (st.hasMoreTokens()) {
+ names.add(st.nextToken());
+ }
+ return names;
+ }
+
+
+ /**
+ * Returns if a named output is multiple.
+ *
+ * @param conf job conf
+ * @param namedOutput named output
+ * @return <code>true</code> if the name output is multi, <code>false</code>
+ * if it is single. If the name output is not defined it returns
+ * <code>false</code>
+ */
+ public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) {
+ checkNamedOutput(conf, namedOutput, false);
+ return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false);
+ }
+
+ /**
+ * Returns the named output OutputFormat.
+ *
+ * @param conf job conf
+ * @param namedOutput named output
+ * @return namedOutput OutputFormat
+ */
+ public static Class<? extends OutputFormat> getNamedOutputFormatClass(
+ JobConf conf, String namedOutput) {
+ checkNamedOutput(conf, namedOutput, false);
+ return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null,
+ OutputFormat.class);
+ }
+
+ /**
+ * Adds a named output for the job.
+ * <p/>
+ *
+ * @param conf job conf to add the named output
+ * @param namedOutput named output name, it has to be a word, letters
+ * and numbers only, cannot be the word 'part' as
+ * that is reserved for the
+ * default output.
+ * @param outputFormatClass OutputFormat class.
+ * @param schema Schema to used for this namedOutput
+ */
+ public static void addNamedOutput(JobConf conf, String namedOutput,
+ Class<? extends OutputFormat> outputFormatClass,
+ Schema schema) {
+ addNamedOutput(conf, namedOutput, false, outputFormatClass, schema);
+ }
+
+ /**
+ * Adds a multi named output for the job.
+ * <p/>
+ *
+ * @param conf job conf to add the named output
+ * @param namedOutput named output name, it has to be a word, letters
+ * and numbers only, cannot be the word 'part' as
+ * that is reserved for the
+ * default output.
+ * @param outputFormatClass OutputFormat class.
+ * @param schema Schema to used for this namedOutput
+ */
+ public static void addMultiNamedOutput(JobConf conf, String namedOutput,
+ Class<? extends OutputFormat> outputFormatClass,
+ Schema schema) {
+ addNamedOutput(conf, namedOutput, true, outputFormatClass, schema);
+ }
+
+ /**
+ * Adds a named output for the job.
+ * <p/>
+ *
+ * @param conf job conf to add the named output
+ * @param namedOutput named output name, it has to be a word, letters
+ * and numbers only, cannot be the word 'part' as
+ * that is reserved for the
+ * default output.
+ * @param multi indicates if the named output is multi
+ * @param outputFormatClass OutputFormat class.
+ * @param schema Schema to used for this namedOutput
+ */
+ private static void addNamedOutput(JobConf conf, String namedOutput,
+ boolean multi,
+ Class<? extends OutputFormat> outputFormatClass,
+ Schema schema) {
+ checkNamedOutputName(namedOutput);
+ checkNamedOutput(conf, namedOutput, true);
+ boolean isMapOnly = conf.getNumReduceTasks() == 0;
+ schemaList.put(namedOutput+"_SCHEMA", schema);
+ conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput);
+ conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
+ OutputFormat.class);
+ conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
+ }
+
+ /**
+ * Enables or disables counters for the named outputs.
+ * <p/>
+ * By default these counters are disabled.
+ * <p/>
+ * MultipleOutputs supports counters, by default the are disabled.
+ * The counters group is the {@link MultipleOutputs} class name.
+ * </p>
+ * The names of the counters are the same as the named outputs. For multi
+ * named outputs the name of the counter is the concatenation of the named
+ * output, and underscore '_' and the multiname.
+ *
+ * @param conf job conf to enableadd the named output.
+ * @param enabled indicates if the counters will be enabled or not.
+ */
+ public static void setCountersEnabled(JobConf conf, boolean enabled) {
+ conf.setBoolean(COUNTERS_ENABLED, enabled);
+ }
+
+ /**
+ * Returns if the counters for the named outputs are enabled or not.
+ * <p/>
+ * By default these counters are disabled.
+ * <p/>
+ * MultipleOutputs supports counters, by default the are disabled.
+ * The counters group is the {@link MultipleOutputs} class name.
+ * </p>
+ * The names of the counters are the same as the named outputs. For multi
+ * named outputs the name of the counter is the concatenation of the named
+ * output, and underscore '_' and the multiname.
+ *
+ *
+ * @param conf job conf to enableadd the named output.
+ * @return TRUE if the counters are enabled, FALSE if they are disabled.
+ */
+ public static boolean getCountersEnabled(JobConf conf) {
+ return conf.getBoolean(COUNTERS_ENABLED, false);
+ }
+
+ // instance code, to be used from Mapper/Reducer code
+
+ private JobConf conf;
+ private OutputFormat outputFormat;
+ private Set<String> namedOutputs;
+ private Map<String, RecordWriter> recordWriters;
+ private boolean countersEnabled;
+
+ /**
+ * Creates and initializes multiple named outputs support, it should be
+ * instantiated in the Mapper/Reducer configure method.
+ *
+ * @param job the job configuration object
+ */
+ public AvroMultipleOutputs(JobConf job) {
+ this.conf = job;
+ outputFormat = new InternalFileOutputFormat();
+ namedOutputs = Collections.unmodifiableSet(
+ new HashSet<String>(AvroMultipleOutputs.getNamedOutputsList(job)));
+ recordWriters = new HashMap<String, RecordWriter>();
+ countersEnabled = getCountersEnabled(job);
+ }
+
+ /**
+ * Returns iterator with the defined name outputs.
+ *
+ * @return iterator with the defined named outputs
+ */
+ public Iterator<String> getNamedOutputs() {
+ return namedOutputs.iterator();
+ }
+
+
+ // by being synchronized MultipleOutputTask can be use with a
+ // MultithreaderMapRunner.
+ private synchronized RecordWriter getRecordWriter(String namedOutput,
+ String baseFileName,
+ final Reporter reporter)
+ throws IOException {
+ RecordWriter writer = recordWriters.get(baseFileName);
+ if (writer == null) {
+ if (countersEnabled && reporter == null) {
+ throw new IllegalArgumentException(
+ "Counters are enabled, Reporter cannot be NULL");
+ }
+ JobConf jobConf = new JobConf(conf);
+ jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput);
+ FileSystem fs = FileSystem.get(conf);
+ writer = outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter);
+
+ if (countersEnabled) {
+ if (reporter == null) {
+ throw new IllegalArgumentException(
+ "Counters are enabled, Reporter cannot be NULL");
+ }
+ writer = new RecordWriterWithCounter(writer, baseFileName, reporter);
+ }
+ recordWriters.put(baseFileName, writer);
+ }
+ return writer;
+ }
+
+ private static class RecordWriterWithCounter implements RecordWriter {
+ private RecordWriter writer;
+ private String counterName;
+ private Reporter reporter;
+
+ public RecordWriterWithCounter(RecordWriter writer, String counterName,
+ Reporter reporter) {
+ this.writer = writer;
+ this.counterName = counterName;
+ this.reporter = reporter;
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public void write(Object key, Object value) throws IOException {
+ reporter.incrCounter(COUNTERS_GROUP, counterName, 1);
+ writer.write(key, value);
+ }
+
+ public void close(Reporter reporter) throws IOException {
+ writer.close(reporter);
+ }
+ }
+
+ /**
+ * Gets the output collector for a named output.
+ * <p/>
+ *
+ * @param namedOutput the named output name
+ * @param reporter the reporter
+ * @return the output collector for the given named output
+ * @throws IOException thrown if output collector could not be created
+ */
+ @SuppressWarnings({"unchecked"})
+ public AvroCollector getCollector(String namedOutput, Reporter reporter)
+ throws IOException {
+ return getCollector(namedOutput, null, reporter);
+ }
+
+ /**
+ * Gets the output collector for a multi named output.
+ * <p/>
+ *
+ * @param namedOutput the named output name
+ * @param multiName the multi name part
+ * @param reporter the reporter
+ * @return the output collector for the given named output
+ * @throws IOException thrown if output collector could not be created
+ */
+ @SuppressWarnings({"unchecked"})
+ public AvroCollector getCollector(String namedOutput, String multiName,
+ Reporter reporter)
+ throws IOException {
+
+ checkNamedOutputName(namedOutput);
+ if (!namedOutputs.contains(namedOutput)) {
+ throw new IllegalArgumentException("Undefined named output '" +
+ namedOutput + "'");
+ }
+ boolean multi = isMultiNamedOutput(conf, namedOutput);
+
+ if (!multi && multiName != null) {
+ throw new IllegalArgumentException("Name output '" + namedOutput +
+ "' has not been defined as multi");
+ }
+ if (multi) {
+ checkTokenName(multiName);
+ }
+
+ String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput;
+
+ final RecordWriter writer =
+ getRecordWriter(namedOutput, baseFileName, reporter);
+
+ return new AvroCollector() {
+
+ @SuppressWarnings({"unchecked"})
+ public void collect(Object key) throws IOException{
+ AvroWrapper wrapper = new AvroWrapper(key);
+ writer.write(wrapper, NullWritable.get());
+ }
+
+ public void collect(Object key,Object value) throws IOException
+ {
+ writer.write(key,value);
+ }
+
+ };
+ }
+
+ /**
+ * Closes all the opened named outputs.
+ * <p/>
+ * If overriden subclasses must invoke <code>super.close()</code> at the
+ * end of their <code>close()</code>
+ *
+ * @throws java.io.IOException thrown if any of the MultipleOutput files
+ * could not be closed properly.
+ */
+ public void close() throws IOException {
+ for (RecordWriter writer : recordWriters.values()) {
+ writer.close(null);
+ }
+ }
+
+ private static class InternalFileOutputFormat extends FileOutputFormat<Object, Object> {
+ public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";
+
+ @SuppressWarnings({"unchecked"})
+ public RecordWriter<Object, Object> getRecordWriter(FileSystem fs,JobConf job, String baseFileName, Progressable arg3) throws IOException
+ {
+ String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
+ String fileName = getUniqueName(job, baseFileName);
+ Schema schema = schemaList.get(nameOutput+"_SCHEMA");
+ JobConf outputConf = new JobConf(job);
+ outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
+ if(schema!=null)
+ AvroJob.setOutputSchema(outputConf,schema);
+ OutputFormat outputFormat = outputConf.getOutputFormat();
+ return outputFormat.getRecordWriter(fs, outputConf, fileName, arg3);
+ }
+ }
+}
+
Propchange: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java?rev=1341230&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java Mon May 21 22:14:15 2012
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.Locale;
+
+import org.apache.hadoop.io.Text;
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.junit.Test;
+
+public class TestAvroMultipleOutputs {
+
+ private static final String UTF8 = "UTF-8";
+
+ public static class MapImpl extends AvroMapper<Utf8, Pair<Utf8, Long>> {
+
+
+
+ @Override
+ public void map(Utf8 text, AvroCollector<Pair<Utf8,Long>> collector,
+ Reporter reporter) throws IOException {
+ StringTokenizer tokens = new StringTokenizer(text.toString());
+ while (tokens.hasMoreTokens())
+ collector.collect(new Pair<Utf8,Long>(new Utf8(tokens.nextToken()),1L));
+ }
+ }
+
+ public static class ReduceImpl
+ extends AvroReducer<Utf8, Long, Pair<Utf8, Long> > {
+ private AvroMultipleOutputs amos;
+
+ public void configure(JobConf Job)
+ {
+ amos=new AvroMultipleOutputs(Job);
+ }
+
+ @Override
+ public void reduce(Utf8 word, Iterable<Long> counts,
+ AvroCollector<Pair<Utf8,Long>> collector,
+ Reporter reporter) throws IOException {
+ long sum = 0;
+ for (long count : counts)
+ sum += count;
+ Pair<Utf8,Long> outputvalue= new Pair<Utf8,Long>(word,sum);
+ amos.getCollector("myavro",reporter).collect(outputvalue);
+ amos.getCollector("myavro1",reporter).collect(outputvalue.toString());
+ collector.collect(new Pair<Utf8,Long>(word, sum));
+ }
+ public void close() throws IOException
+ {
+ amos.close();
+ }
+ }
+
+ @Test public void runTestsInOrder() throws Exception {
+ testJob();
+ testProjection();
+ testProjection1();
+ }
+
+ @SuppressWarnings("deprecation")
+ public void testJob() throws Exception {
+ JobConf job = new JobConf();
+
+// private static final String UTF8 = "UTF-8";
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path outputPath = new Path(dir + "/out");
+
+ outputPath.getFileSystem(job).delete(outputPath);
+ WordCountUtil.writeLinesFile();
+
+ job.setJobName("AvroMultipleOutputs");
+
+ AvroJob.setInputSchema(job, Schema.create(Schema.Type.STRING));
+ AvroJob.setOutputSchema(job,
+ new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema());
+
+ AvroJob.setMapperClass(job, MapImpl.class);
+ AvroJob.setReducerClass(job, ReduceImpl.class);
+
+ FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+ FileOutputFormat.setOutputPath(job, outputPath);
+ FileOutputFormat.setCompressOutput(job, false);
+ AvroMultipleOutputs.addNamedOutput(job,"myavro",AvroOutputFormat.class, new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema());
+ AvroMultipleOutputs.addNamedOutput(job,"myavro1",AvroOutputFormat.class, Schema.create(Schema.Type.STRING));
+
+ WordCountUtil.setMeta(job);
+
+
+ JobClient.runJob(job);
+
+ WordCountUtil.validateCountsFile();
+ }
+
+ @SuppressWarnings("deprecation")
+ public void testProjection() throws Exception {
+ JobConf job = new JobConf();
+
+ Integer defaultRank = new Integer(-1);
+
+ String jsonSchema =
+ "{\"type\":\"record\"," +
+ "\"name\":\"org.apache.avro.mapred.Pair\","+
+ "\"fields\": [ " +
+ "{\"name\":\"rank\", \"type\":\"int\", \"default\": -1}," +
+ "{\"name\":\"value\", \"type\":\"long\"}" +
+ "]}";
+
+ Schema readerSchema = Schema.parse(jsonSchema);
+
+ AvroJob.setInputSchema(job, readerSchema);
+
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path inputPath = new Path(dir + "/out" + "/myavro-r-00000.avro");
+ FileStatus fileStatus = FileSystem.get(job).getFileStatus(inputPath);
+ FileSplit fileSplit = new FileSplit(inputPath, 0, fileStatus.getLen(), job);
+
+
+ AvroRecordReader<Pair<Integer, Long>> recordReader = new AvroRecordReader<Pair<Integer, Long>>(job, fileSplit);
+
+ AvroWrapper<Pair<Integer, Long>> inputPair = new AvroWrapper<Pair<Integer, Long>>(null);
+ NullWritable ignore = NullWritable.get();
+
+ long sumOfCounts = 0;
+ long numOfCounts = 0;
+ while(recordReader.next(inputPair, ignore)) {
+ Assert.assertEquals((Integer)inputPair.datum().get(0), defaultRank);
+ sumOfCounts += (Long) inputPair.datum().get(1);
+ numOfCounts++;
+ }
+
+ Assert.assertEquals(numOfCounts, WordCountUtil.COUNTS.size());
+
+ long actualSumOfCounts = 0;
+ for(Long count : WordCountUtil.COUNTS.values()) {
+ actualSumOfCounts += count;
+ }
+
+ Assert.assertEquals(sumOfCounts, actualSumOfCounts);
+ }
+
+ @SuppressWarnings("deprecation")
+ // Test for a differnt schema output
+ public void testProjection1() throws Exception {
+ JobConf job = new JobConf();
+ Schema readerSchema = Schema.create(Schema.Type.STRING);
+ AvroJob.setInputSchema(job, readerSchema);
+
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path inputPath = new Path(dir + "/out" + "/myavro1-r-00000.avro");
+ FileStatus fileStatus = FileSystem.get(job).getFileStatus(inputPath);
+ FileSplit fileSplit = new FileSplit(inputPath, 0, fileStatus.getLen(), job);
+ AvroWrapper<Utf8> inputPair = new AvroWrapper<Utf8>(null);
+ NullWritable ignore = NullWritable.get();
+ AvroRecordReader<Utf8> recordReader = new AvroRecordReader<Utf8>(job, fileSplit);
+ long sumOfCounts = 0;
+ long numOfCounts = 0;
+ while(recordReader.next(inputPair, ignore)) {
+ sumOfCounts += Long.parseLong(inputPair.datum().toString().split(":")[2].replace("}","").trim());
+ numOfCounts++;
+ }
+ Assert.assertEquals(numOfCounts, WordCountUtil.COUNTS.size());
+ long actualSumOfCounts = 0;
+ for(Long count : WordCountUtil.COUNTS.values()) {
+ actualSumOfCounts += count;
+ }
+ Assert.assertEquals(sumOfCounts, actualSumOfCounts);
+ }
+}
Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
------------------------------------------------------------------------------
svn:eol-style = native
Re: svn commit: r1341230 - in /avro/trunk: ./ lang/csharp/src/apache/test/Schema/
lang/java/mapred/src/main/java/org/apache/avro/mapred/ lang/java/mapred/src/test/java/org/apache/avro/mapred/
Posted by Doug Cutting <cu...@apache.org>.
On 05/21/2012 03:14 PM, cutting@apache.org wrote:
> avro/trunk/lang/csharp/src/apache/test/Schema/SchemaNormalizationTests.cs
Oops. This change should have been committed as a part of AVRO-1098,
not here.
Doug