You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/04/02 15:52:42 UTC

svn commit: r1308354 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapred/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: tedyu
Date: Mon Apr  2 13:52:42 2012
New Revision: 1308354

URL: http://svn.apache.org/viewvc?rev=1308354&view=rev
Log:
HBASE-5663 HBASE-5636 MultithreadedTableMapper doesn't work (Takuya Ueshin)

Added:
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
Removed:
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java
Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java?rev=1308354&r1=1308353&r2=1308354&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java Mon Apr  2 13:52:42 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduc
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -31,11 +32,14 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MapContext;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.ReflectionUtils;
 
 
@@ -239,15 +243,17 @@ public class MultithreadedTableMapper<K2
           context.getConfiguration());
       try {
         Constructor c = context.getClass().getConstructor(
+          Mapper.class,
           Configuration.class,
-          outer.getTaskAttemptID().getClass(),
-          SubMapRecordReader.class,
-          SubMapRecordWriter.class,
-          context.getOutputCommitter().getClass(),
-          SubMapStatusReporter.class,
-          outer.getInputSplit().getClass());
+          TaskAttemptID.class,
+          RecordReader.class,
+          RecordWriter.class,
+          OutputCommitter.class,
+          StatusReporter.class,
+          InputSplit.class);
         c.setAccessible(true);
         subcontext = (Context) c.newInstance(
+          mapper,
           outer.getConfiguration(), 
           outer.getTaskAttemptID(),
           new SubMapRecordReader(),
@@ -256,8 +262,31 @@ public class MultithreadedTableMapper<K2
           new SubMapStatusReporter(),
           outer.getInputSplit());
       } catch (Exception e) {
-        // rethrow as IOE
-        throw new IOException(e);
+        try {
+          Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor(
+            Configuration.class,
+            TaskAttemptID.class,
+            RecordReader.class,
+            RecordWriter.class,
+            OutputCommitter.class,
+            StatusReporter.class,
+            InputSplit.class);
+          c.setAccessible(true);
+          MapContext mc = (MapContext) c.newInstance(
+            outer.getConfiguration(), 
+            outer.getTaskAttemptID(),
+            new SubMapRecordReader(),
+            new SubMapRecordWriter(),
+            context.getOutputCommitter(),
+            new SubMapStatusReporter(),
+            outer.getInputSplit());
+          Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
+          Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
+          subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc);
+        } catch (Exception ee) {
+          // rethrow as IOE
+          throw new IOException(e);
+        }
       }
     }
 
@@ -270,4 +299,4 @@ public class MultithreadedTableMapper<K2
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=1308354&r1=1308353&r2=1308354&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Mon Apr  2 13:52:42 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapred;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableMap;
 
@@ -28,7 +29,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -42,11 +42,15 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
  * on our tables is simple - take every row in the table, reverse the value of
@@ -58,7 +62,7 @@ public class TestTableMapReduce {
     LogFactory.getLog(TestTableMapReduce.class.getName());
   private static final HBaseTestingUtility UTIL =
     new HBaseTestingUtility();
-  static final String MULTI_REGION_TABLE_NAME = "mrtest";
+  static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
   static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
   static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
 
@@ -69,12 +73,10 @@ public class TestTableMapReduce {
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
-    desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
-    desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
     UTIL.startMiniCluster();
-    HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration());
-    admin.createTable(desc, HBaseTestingUtility.KEYS);
+    HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY});
+    UTIL.createMultiRegions(table, INPUT_FAMILY);
+    UTIL.loadTable(table, INPUT_FAMILY);
     UTIL.startMiniMapReduceCluster();
   }
 
@@ -150,7 +152,8 @@ public class TestTableMapReduce {
         IdentityTableReduce.class, jobConf);
 
       LOG.info("Started " + Bytes.toString(table.getTableName()));
-      JobClient.runJob(jobConf);
+      RunningJob job = JobClient.runJob(jobConf);
+      assertTrue(job.isSuccessful());
       LOG.info("After map/reduce completion");
 
       // verify map-reduce results
@@ -184,7 +187,7 @@ public class TestTableMapReduce {
         // continue
       }
     }
-    org.junit.Assert.assertTrue(verified);
+    assertTrue(verified);
   }
 
   /**
@@ -199,7 +202,10 @@ public class TestTableMapReduce {
     TableInputFormat.addColumns(scan, columns);
     ResultScanner scanner = table.getScanner(scan);
     try {
-      for (Result r : scanner) {
+      Iterator<Result> itr = scanner.iterator();
+      assertTrue(itr.hasNext());
+      while(itr.hasNext()) {
+        Result r = itr.next();
         if (LOG.isDebugEnabled()) {
           if (r.size() > 2 ) {
             throw new IOException("Too many results, expected 2 got " +
@@ -247,7 +253,7 @@ public class TestTableMapReduce {
                 r.getRow() + ", first value=" + first + ", second value=" +
                 second);
           }
-          org.junit.Assert.fail();
+          fail();
         }
       }
     } finally {

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java?rev=1308354&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java Mon Apr  2 13:52:42 2012
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of
+ * a particular cell, and write it back to the table.
+ */
+@Category(LargeTests.class)
+public class TestMultithreadedTableMapper {
+  private static final Log LOG = LogFactory.getLog(TestMultithreadedTableMapper.class);
+  private static final HBaseTestingUtility UTIL =
+      new HBaseTestingUtility();
+  static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
+  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+  static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
+  static final int    NUMBER_OF_THREADS = 10;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    UTIL.startMiniCluster();
+    HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY});
+    UTIL.createMultiRegions(table, INPUT_FAMILY);
+    UTIL.loadTable(table, INPUT_FAMILY);
+    UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL.shutdownMiniMapReduceCluster();
+    UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Pass the given key and processed record reduce
+   */
+  public static class ProcessContentsMapper
+  extends TableMapper<ImmutableBytesWritable, Put> {
+
+    /**
+     * Pass the key, and reversed value to reduce
+     *
+     * @param key
+     * @param value
+     * @param context
+     * @throws IOException
+     */
+    public void map(ImmutableBytesWritable key, Result value,
+        Context context)
+            throws IOException, InterruptedException {
+      if (value.size() != 1) {
+        throw new IOException("There should only be one input column");
+      }
+      Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+      cf = value.getMap();
+      if(!cf.containsKey(INPUT_FAMILY)) {
+        throw new IOException("Wrong input columns. Missing: '" +
+            Bytes.toString(INPUT_FAMILY) + "'.");
+      }
+      // Get the original value and reverse it
+      String originalValue = new String(value.getValue(INPUT_FAMILY, null),
+          HConstants.UTF8_ENCODING);
+      StringBuilder newValue = new StringBuilder(originalValue);
+      newValue.reverse();
+      // Now set the value to be collected
+      Put outval = new Put(key.get());
+      outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
+      context.write(key, outval);
+    }
+  }
+
+  /**
+   * Test multithreadedTableMappper map/reduce against a multi-region table
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testMultithreadedTableMapper()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()),
+        MULTI_REGION_TABLE_NAME));
+  }
+
+  private void runTestOnTable(HTable table)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = null;
+    try {
+      LOG.info("Before map/reduce startup");
+      job = new Job(table.getConfiguration(), "process column contents");
+      job.setNumReduceTasks(1);
+      Scan scan = new Scan();
+      scan.addFamily(INPUT_FAMILY);
+      TableMapReduceUtil.initTableMapperJob(
+          Bytes.toString(table.getTableName()), scan,
+          MultithreadedTableMapper.class, ImmutableBytesWritable.class,
+          Put.class, job);
+      MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class);
+      MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS);
+      TableMapReduceUtil.initTableReducerJob(
+          Bytes.toString(table.getTableName()),
+          IdentityTableReducer.class, job);
+      FileOutputFormat.setOutputPath(job, new Path("test"));
+      LOG.info("Started " + Bytes.toString(table.getTableName()));
+      assertTrue(job.waitForCompletion(true));
+      LOG.info("After map/reduce completion");
+      // verify map-reduce results
+      verify(Bytes.toString(table.getTableName()));
+    } finally {
+      table.close();
+      if (job != null) {
+        FileUtil.fullyDelete(
+            new File(job.getConfiguration().get("hadoop.tmp.dir")));
+      }
+    }
+  }
+
+  private void verify(String tableName) throws IOException {
+    HTable table = new HTable(new Configuration(UTIL.getConfiguration()), tableName);
+    boolean verified = false;
+    long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
+    int numRetries = UTIL.getConfiguration().getInt("hbase.client.retries.number", 5);
+    for (int i = 0; i < numRetries; i++) {
+      try {
+        LOG.info("Verification attempt #" + i);
+        verifyAttempt(table);
+        verified = true;
+        break;
+      } catch (NullPointerException e) {
+        // If here, a cell was empty.  Presume its because updates came in
+        // after the scanner had been opened.  Wait a while and retry.
+        LOG.debug("Verification attempt failed: " + e.getMessage());
+      }
+      try {
+        Thread.sleep(pause);
+      } catch (InterruptedException e) {
+        // continue
+      }
+    }
+    assertTrue(verified);
+    table.close();
+  }
+
+  /**
+   * Looks at every value of the mapreduce output and verifies that indeed
+   * the values have been reversed.
+   *
+   * @param table Table to scan.
+   * @throws IOException
+   * @throws NullPointerException if we failed to find a cell value
+   */
+  private void verifyAttempt(final HTable table)
+      throws IOException, NullPointerException {
+    Scan scan = new Scan();
+    scan.addFamily(INPUT_FAMILY);
+    scan.addFamily(OUTPUT_FAMILY);
+    ResultScanner scanner = table.getScanner(scan);
+    try {
+      Iterator<Result> itr = scanner.iterator();
+      assertTrue(itr.hasNext());
+      while(itr.hasNext()) {
+        Result r = itr.next();
+        if (LOG.isDebugEnabled()) {
+          if (r.size() > 2 ) {
+            throw new IOException("Too many results, expected 2 got " +
+                r.size());
+          }
+        }
+        byte[] firstValue = null;
+        byte[] secondValue = null;
+        int count = 0;
+        for(KeyValue kv : r.list()) {
+          if (count == 0) {
+            firstValue = kv.getValue();
+          }else if (count == 1) {
+            secondValue = kv.getValue();
+          }else if (count == 2) {
+            break;
+          }
+          count++;
+        }
+        String first = "";
+        if (firstValue == null) {
+          throw new NullPointerException(Bytes.toString(r.getRow()) +
+              ": first value is null");
+        }
+        first = new String(firstValue, HConstants.UTF8_ENCODING);
+        String second = "";
+        if (secondValue == null) {
+          throw new NullPointerException(Bytes.toString(r.getRow()) +
+              ": second value is null");
+        }
+        byte[] secondReversed = new byte[secondValue.length];
+        for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
+          secondReversed[i] = secondValue[j];
+        }
+        second = new String(secondReversed, HConstants.UTF8_ENCODING);
+        if (first.compareTo(second) != 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("second key is not the reverse of first. row=" +
+                Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
+                ", second value=" + second);
+          }
+          fail();
+        }
+      }
+    } finally {
+      scanner.close();
+    }
+  }
+
+  @org.junit.Rule
+  public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+  new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}
+

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java?rev=1308354&r1=1308353&r2=1308354&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java Mon Apr  2 13:52:42 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapreduc
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableMap;
 
@@ -30,7 +31,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -59,18 +59,16 @@ public class TestTableMapReduce {
   private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
   private static final HBaseTestingUtility UTIL =
     new HBaseTestingUtility();
-  static final String MULTI_REGION_TABLE_NAME = "mrtest";
+  static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
   static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
   static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
-    desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
-    desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
     UTIL.startMiniCluster();
-    HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration());
-    admin.createTable(desc, HBaseTestingUtility.KEYS);
+    HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY});
+    UTIL.createMultiRegions(table, INPUT_FAMILY);
+    UTIL.loadTable(table, INPUT_FAMILY);
     UTIL.startMiniMapReduceCluster();
   }
 
@@ -150,7 +148,7 @@ public class TestTableMapReduce {
         IdentityTableReducer.class, job);
       FileOutputFormat.setOutputPath(job, new Path("test"));
       LOG.info("Started " + Bytes.toString(table.getTableName()));
-      job.waitForCompletion(true);
+      assertTrue(job.waitForCompletion(true));
       LOG.info("After map/reduce completion");
 
       // verify map-reduce results
@@ -204,7 +202,10 @@ public class TestTableMapReduce {
     scan.addFamily(OUTPUT_FAMILY);
     ResultScanner scanner = table.getScanner(scan);
     try {
-      for (Result r : scanner) {
+      Iterator<Result> itr = scanner.iterator();
+      assertTrue(itr.hasNext());
+      while(itr.hasNext()) {
+        Result r = itr.next();
         if (LOG.isDebugEnabled()) {
           if (r.size() > 2 ) {
             throw new IOException("Too many results, expected 2 got " +