You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2013/07/29 06:15:02 UTC

svn commit: r1507899 - in /mahout/trunk: CHANGELOG core/src/test/java/org/apache/mahout/common/DummyRecordWriter.java core/src/test/java/org/apache/mahout/common/DummyRecordWriterTest.java

Author: smarthi
Date: Mon Jul 29 04:15:02 2013
New Revision: 1507899

URL: http://svn.apache.org/r1507899
Log:
MAHOUT-1284: DummyRecordWriter's bug with reused Writables

Added:
    mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriterTest.java
Modified:
    mahout/trunk/CHANGELOG
    mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriter.java

Modified: mahout/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1507899&r1=1507898&r2=1507899&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Mon Jul 29 04:15:02 2013
@@ -14,6 +14,8 @@ Release 0.9 - unreleased
 
   MAHOUT-1287: classifier.sgd.CsvRecordFactory incorrectly parses CSV format (Alex Franchuk via smarthi)
 
+  MAHOUT-1284: DummyRecordWriter's bug with reused Writables (Maysam Yabandeh via smarthi)
+
   MAHOUT-1275: Dropped bz2 distribution format for source and binaries (sslavic)
 
 Release 0.8 - 2013-07-25

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriter.java?rev=1507899&r1=1507898&r2=1507899&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriter.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriter.java Mon Jul 29 04:15:02 2013
@@ -18,15 +18,10 @@
 package org.apache.mahout.common;
 
 import com.google.common.collect.Lists;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.MapContext;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -34,13 +29,47 @@ import org.apache.hadoop.mapreduce.Reduc
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public final class DummyRecordWriter<K extends Writable, V extends Writable> extends RecordWriter<K, V> {
 
-public final class DummyRecordWriter<K, V> extends RecordWriter<K, V> {
+  private static final Logger log = LoggerFactory.getLogger(DummyRecordWriter.class);
 
-  private final Map<K, List<V>> data = new TreeMap<K, List<V>>();
+  private final Map<K, List<V>> data = Maps.newHashMap();
 
   @Override
   public void write(K key, V value) {
+    // if the user reuses the same writable class, we need to create a new one
+    // otherwise the Map content will be modified after the insert
+    try {
+      if (!(key instanceof NullWritable)) {
+        K newKey = (K) key.getClass().newInstance();
+        cloneWritable(key, newKey);
+        key = newKey;
+      }
+      V newValue = (V) value.getClass().newInstance();
+      cloneWritable(value, newValue);
+      value = newValue;
+    } catch (InstantiationException e) {
+      log.error(e.getMessage());
+    } catch (IllegalAccessException e) {
+      log.error(e.getMessage());
+    } catch (IOException e) {
+      log.error(e.getMessage());
+    }
+
     List<V> points = data.get(key);
     if (points == null) {
       points = Lists.newArrayList();
@@ -49,6 +78,16 @@ public final class DummyRecordWriter<K, 
     points.add(value);
   }
 
+  private void cloneWritable(Writable from, Writable to) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    from.write(dos);
+    dos.close();
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    DataInputStream dis = new DataInputStream(bais);
+    to.readFields(dis);
+  }
+
   @Override
   public void close(TaskAttemptContext context) {
   }
@@ -101,13 +140,13 @@ public final class DummyRecordWriter<K, 
     }
   }
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @SuppressWarnings({"unchecked", "rawtypes"})
   private static <K1, V1, K2, V2> Mapper<K1, V1, K2, V2>.Context buildNewMapperContext(
-      Configuration configuration, RecordWriter<K2, V2> output) throws Exception {
+    Configuration configuration, RecordWriter<K2, V2> output) throws Exception {
     Class<?> mapContextImplClass = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl");
     Constructor<?> cons = mapContextImplClass.getConstructors()[0];
     Object mapContextImpl = cons.newInstance(configuration,
-        new TaskAttemptID(), null, output, null, new DummyStatusReporter(), null);
+      new TaskAttemptID(), null, output, null, new DummyStatusReporter(), null);
 
     Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
     Object wrappedMapper = wrappedMapperClass.getConstructor().newInstance();
@@ -115,20 +154,20 @@ public final class DummyRecordWriter<K, 
     return (Mapper.Context) getMapContext.invoke(wrappedMapper, mapContextImpl);
   }
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @SuppressWarnings({"unchecked", "rawtypes"})
   private static <K1, V1, K2, V2> Mapper<K1, V1, K2, V2>.Context buildOldMapperContext(
-      Mapper<K1, V1, K2, V2> mapper, Configuration configuration,
-      RecordWriter<K2, V2> output) throws Exception {
+    Mapper<K1, V1, K2, V2> mapper, Configuration configuration,
+    RecordWriter<K2, V2> output) throws Exception {
     Constructor<?> cons = getNestedContextConstructor(mapper.getClass());
     // first argument to the constructor is the enclosing instance
     return (Mapper.Context) cons.newInstance(mapper, configuration,
-        new TaskAttemptID(), null, output, null, new DummyStatusReporter(), null);
+      new TaskAttemptID(), null, output, null, new DummyStatusReporter(), null);
   }
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @SuppressWarnings({"unchecked", "rawtypes"})
   private static <K1, V1, K2, V2> Reducer<K1, V1, K2, V2>.Context buildNewReducerContext(
-      Configuration configuration, RecordWriter<K2, V2> output, Class<K1> keyClass,
-      Class<V1> valueClass) throws Exception {
+    Configuration configuration, RecordWriter<K2, V2> output, Class<K1> keyClass,
+    Class<V1> valueClass) throws Exception {
     Class<?> reduceContextImplClass = Class.forName("org.apache.hadoop.mapreduce.task.ReduceContextImpl");
     Constructor<?> cons = reduceContextImplClass.getConstructors()[0];
     Object reduceContextImpl = cons.newInstance(configuration,
@@ -148,26 +187,26 @@ public final class DummyRecordWriter<K, 
     Method getReducerContext = wrappedReducerClass.getMethod("getReducerContext", ReduceContext.class);
     return (Reducer.Context) getReducerContext.invoke(wrappedReducer, reduceContextImpl);
   }
-  
-  @SuppressWarnings({ "unchecked", "rawtypes" })
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
   private static <K1, V1, K2, V2> Reducer<K1, V1, K2, V2>.Context buildOldReducerContext(
-      Reducer<K1, V1, K2, V2> reducer, Configuration configuration,
-      RecordWriter<K2, V2> output, Class<K1> keyClass,
-      Class<V1> valueClass) throws Exception {
+    Reducer<K1, V1, K2, V2> reducer, Configuration configuration,
+    RecordWriter<K2, V2> output, Class<K1> keyClass,
+    Class<V1> valueClass) throws Exception {
     Constructor<?> cons = getNestedContextConstructor(reducer.getClass());
     // first argument to the constructor is the enclosing instance
     return (Reducer.Context) cons.newInstance(reducer,
-        configuration,
-        new TaskAttemptID(),
-        new MockIterator(),
-        null,
-        null,
-        output,
-        null,
-        new DummyStatusReporter(),
-        null,
-        keyClass,
-        valueClass);
+      configuration,
+      new TaskAttemptID(),
+      new MockIterator(),
+      null,
+      null,
+      output,
+      null,
+      new DummyStatusReporter(),
+      null,
+      keyClass,
+      valueClass);
   }
 
   private static Constructor<?> getNestedContextConstructor(Class<?> outerClass) {

Added: mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriterTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriterTest.java?rev=1507899&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriterTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/common/DummyRecordWriterTest.java Mon Jul 29 04:15:02 2013
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DummyRecordWriterTest {
+
+  @Test
+  public void testWrite() {
+    DummyRecordWriter<IntWritable, VectorWritable> writer = 
+        new DummyRecordWriter<IntWritable, VectorWritable>();
+    IntWritable reusableIntWritable = new IntWritable();
+    VectorWritable reusableVectorWritable = new VectorWritable();
+    reusableIntWritable.set(0);
+    reusableVectorWritable.set(new DenseVector(new double[] { 1, 2, 3 }));
+    writer.write(reusableIntWritable, reusableVectorWritable);
+    reusableIntWritable.set(1);
+    reusableVectorWritable.set(new DenseVector(new double[] { 4, 5, 6 }));
+    writer.write(reusableIntWritable, reusableVectorWritable);
+
+    Assert.assertEquals(
+        "The writer must remember the two keys that is written to it", 2,
+        writer.getKeys().size());
+  }
+}