You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2013/07/29 06:15:02 UTC
svn commit: r1507899 - in /mahout/trunk: CHANGELOG
core/src/test/java/org/apache/mahout/common/DummyRecordWriter.java
core/src/test/java/org/apache/mahout/common/DummyRecordWriterTest.java
Author: smarthi
Date: Mon Jul 29 04:15:02 2013
New Revision: 1507899
URL: http://svn.apache.org/r1507899
Log:
MAHOUT-1284: DummyRecordWriter's bug with reused Writables
Added:
mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriterTest.java
Modified:
mahout/trunk/CHANGELOG
mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriter.java
Modified: mahout/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1507899&r1=1507898&r2=1507899&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Mon Jul 29 04:15:02 2013
@@ -14,6 +14,8 @@ Release 0.9 - unreleased
MAHOUT-1287: classifier.sgd.CsvRecordFactory incorrectly parses CSV format (Alex Franchuk via smarthi)
+ MAHOUT-1284: DummyRecordWriter's bug with reused Writables (Maysam Yabandeh via smarthi)
+
MAHOUT-1275: Dropped bz2 distribution format for source and binaries (sslavic)
Release 0.8 - 2013-07-25
Modified: mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriter.java?rev=1507899&r1=1507898&r2=1507899&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriter.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriter.java Mon Jul 29 04:15:02 2013
@@ -18,15 +18,10 @@
package org.apache.mahout.common;
import com.google.common.collect.Lists;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
+import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -34,13 +29,47 @@ import org.apache.hadoop.mapreduce.Reduc
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public final class DummyRecordWriter<K extends Writable, V extends Writable> extends RecordWriter<K, V> {
-public final class DummyRecordWriter<K, V> extends RecordWriter<K, V> {
+ private static final Logger log = LoggerFactory.getLogger(DummyRecordWriter.class);
- private final Map<K, List<V>> data = new TreeMap<K, List<V>>();
+ private final Map<K, List<V>> data = Maps.newHashMap();
@Override
public void write(K key, V value) {
+ // if the user reuses the same writable class, we need to create a new one
+ // otherwise the Map content will be modified after the insert
+ try {
+ if (!(key instanceof NullWritable)) {
+ K newKey = (K) key.getClass().newInstance();
+ cloneWritable(key, newKey);
+ key = newKey;
+ }
+ V newValue = (V) value.getClass().newInstance();
+ cloneWritable(value, newValue);
+ value = newValue;
+ } catch (InstantiationException e) {
+ log.error(e.getMessage());
+ } catch (IllegalAccessException e) {
+ log.error(e.getMessage());
+ } catch (IOException e) {
+ log.error(e.getMessage());
+ }
+
List<V> points = data.get(key);
if (points == null) {
points = Lists.newArrayList();
@@ -49,6 +78,16 @@ public final class DummyRecordWriter<K,
points.add(value);
}
+ private void cloneWritable(Writable from, Writable to) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ from.write(dos);
+ dos.close();
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ DataInputStream dis = new DataInputStream(bais);
+ to.readFields(dis);
+ }
+
@Override
public void close(TaskAttemptContext context) {
}
@@ -101,13 +140,13 @@ public final class DummyRecordWriter<K,
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({"unchecked", "rawtypes"})
private static <K1, V1, K2, V2> Mapper<K1, V1, K2, V2>.Context buildNewMapperContext(
- Configuration configuration, RecordWriter<K2, V2> output) throws Exception {
+ Configuration configuration, RecordWriter<K2, V2> output) throws Exception {
Class<?> mapContextImplClass = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl");
Constructor<?> cons = mapContextImplClass.getConstructors()[0];
Object mapContextImpl = cons.newInstance(configuration,
- new TaskAttemptID(), null, output, null, new DummyStatusReporter(), null);
+ new TaskAttemptID(), null, output, null, new DummyStatusReporter(), null);
Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
Object wrappedMapper = wrappedMapperClass.getConstructor().newInstance();
@@ -115,20 +154,20 @@ public final class DummyRecordWriter<K,
return (Mapper.Context) getMapContext.invoke(wrappedMapper, mapContextImpl);
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({"unchecked", "rawtypes"})
private static <K1, V1, K2, V2> Mapper<K1, V1, K2, V2>.Context buildOldMapperContext(
- Mapper<K1, V1, K2, V2> mapper, Configuration configuration,
- RecordWriter<K2, V2> output) throws Exception {
+ Mapper<K1, V1, K2, V2> mapper, Configuration configuration,
+ RecordWriter<K2, V2> output) throws Exception {
Constructor<?> cons = getNestedContextConstructor(mapper.getClass());
// first argument to the constructor is the enclosing instance
return (Mapper.Context) cons.newInstance(mapper, configuration,
- new TaskAttemptID(), null, output, null, new DummyStatusReporter(), null);
+ new TaskAttemptID(), null, output, null, new DummyStatusReporter(), null);
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({"unchecked", "rawtypes"})
private static <K1, V1, K2, V2> Reducer<K1, V1, K2, V2>.Context buildNewReducerContext(
- Configuration configuration, RecordWriter<K2, V2> output, Class<K1> keyClass,
- Class<V1> valueClass) throws Exception {
+ Configuration configuration, RecordWriter<K2, V2> output, Class<K1> keyClass,
+ Class<V1> valueClass) throws Exception {
Class<?> reduceContextImplClass = Class.forName("org.apache.hadoop.mapreduce.task.ReduceContextImpl");
Constructor<?> cons = reduceContextImplClass.getConstructors()[0];
Object reduceContextImpl = cons.newInstance(configuration,
@@ -148,26 +187,26 @@ public final class DummyRecordWriter<K,
Method getReducerContext = wrappedReducerClass.getMethod("getReducerContext", ReduceContext.class);
return (Reducer.Context) getReducerContext.invoke(wrappedReducer, reduceContextImpl);
}
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
private static <K1, V1, K2, V2> Reducer<K1, V1, K2, V2>.Context buildOldReducerContext(
- Reducer<K1, V1, K2, V2> reducer, Configuration configuration,
- RecordWriter<K2, V2> output, Class<K1> keyClass,
- Class<V1> valueClass) throws Exception {
+ Reducer<K1, V1, K2, V2> reducer, Configuration configuration,
+ RecordWriter<K2, V2> output, Class<K1> keyClass,
+ Class<V1> valueClass) throws Exception {
Constructor<?> cons = getNestedContextConstructor(reducer.getClass());
// first argument to the constructor is the enclosing instance
return (Reducer.Context) cons.newInstance(reducer,
- configuration,
- new TaskAttemptID(),
- new MockIterator(),
- null,
- null,
- output,
- null,
- new DummyStatusReporter(),
- null,
- keyClass,
- valueClass);
+ configuration,
+ new TaskAttemptID(),
+ new MockIterator(),
+ null,
+ null,
+ output,
+ null,
+ new DummyStatusReporter(),
+ null,
+ keyClass,
+ valueClass);
}
private static Constructor<?> getNestedContextConstructor(Class<?> outerClass) {
Added: mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriterTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriterTest.java?rev=1507899&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriterTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriterTest.java Mon Jul 29 04:15:02 2013
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DummyRecordWriterTest {
+
+ @Test
+ public void testWrite() {
+ DummyRecordWriter<IntWritable, VectorWritable> writer =
+ new DummyRecordWriter<IntWritable, VectorWritable>();
+ IntWritable reusableIntWritable = new IntWritable();
+ VectorWritable reusableVectorWritable = new VectorWritable();
+ reusableIntWritable.set(0);
+ reusableVectorWritable.set(new DenseVector(new double[] { 1, 2, 3 }));
+ writer.write(reusableIntWritable, reusableVectorWritable);
+ reusableIntWritable.set(1);
+ reusableVectorWritable.set(new DenseVector(new double[] { 4, 5, 6 }));
+ writer.write(reusableIntWritable, reusableVectorWritable);
+
+ Assert.assertEquals(
+ "The writer must remember the two keys that is written to it", 2,
+ writer.getKeys().size());
+ }
+}