You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/10/22 20:08:18 UTC

svn commit: r1400976 - in /accumulo/branches/1.4/src/core/src: main/java/org/apache/accumulo/core/client/mapreduce/ test/java/org/apache/accumulo/core/client/mapreduce/

Author: kturner
Date: Mon Oct 22 18:08:17 2012
New Revision: 1400976

URL: http://svn.apache.org/viewvc?rev=1400976&view=rev
Log:
ACCUMULO-826 ACCUMULO-507 reverted revisions 1397700,1382923,1339308,1339223,1336322.  These changes caused map reduce jobs to fail if the process that started the job exited.

Modified:
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
    accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1400976&r1=1400975&r2=1400976&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java (original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java Mon Oct 22 18:08:17 2012
@@ -38,13 +38,6 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -75,7 +68,7 @@ public class AccumuloOutputFormat extend
   private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
   private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
   private static final String USERNAME = PREFIX + ".username";
-  private static final String PASSWORD_PATH = PREFIX + ".password";
+  private static final String PASSWORD = PREFIX + ".password";
   private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable";
   
   private static final String INSTANCE_NAME = PREFIX + ".instanceName";
@@ -135,28 +128,10 @@ public class AccumuloOutputFormat extend
     
     ArgumentChecker.notNull(user, passwd);
     conf.set(USERNAME, user);
+    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
     conf.setBoolean(CREATETABLES, createTables);
     if (defaultTable != null)
       conf.set(DEFAULT_TABLE_NAME, defaultTable);
-    
-    try {
-      FileSystem fs = FileSystem.get(conf);
-      Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis() + ".pw");
-      conf.set(PASSWORD_PATH, file.toString());
-      FSDataOutputStream fos = fs.create(file, false);
-      fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
-      fs.deleteOnExit(file);
-      
-      byte[] encodedPw = Base64.encodeBase64(passwd);
-      fos.writeInt(encodedPw.length);
-      fos.write(encodedPw);
-      fos.close();
-      
-      DistributedCache.addCacheFile(file.toUri(), conf);
-    } catch (IOException ioe) {
-      throw new RuntimeException(ioe);
-    }
-
   }
   
   /**
@@ -257,28 +232,21 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * @throws IOException
+   * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a
+   * string, and is not intended to be secure.
    * 
    * @deprecated Use {@link #getPassword(Configuration)} instead
    */
-  protected static byte[] getPassword(JobContext job) throws IOException {
+  protected static byte[] getPassword(JobContext job) {
     return getPassword(job.getConfiguration());
   }
   
   /**
-   * @throws IOException
+   * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a
+   * string, and is not intended to be secure.
    */
-  protected static byte[] getPassword(Configuration conf) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    Path file = new Path(conf.get(PASSWORD_PATH));
-    
-    FSDataInputStream fdis = fs.open(file);
-    int length = fdis.readInt();
-    byte[] encodedPassword = new byte[length];
-    fdis.read(encodedPassword);
-    fdis.close();
-    
-    return Base64.decodeBase64(encodedPassword);
+  protected static byte[] getPassword(Configuration conf) {
+    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
   }
   
   /**
@@ -386,7 +354,7 @@ public class AccumuloOutputFormat extend
     
     private Connector conn;
     
-    AccumuloRecordWriter(TaskAttemptContext attempt) throws AccumuloException, AccumuloSecurityException, IOException {
+    AccumuloRecordWriter(TaskAttemptContext attempt) throws AccumuloException, AccumuloSecurityException {
       Level l = getLogLevel(attempt);
       if (l != null)
         log.setLevel(getLogLevel(attempt));

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1400976&r1=1400975&r2=1400976&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Mon Oct 22 18:08:17 2012
@@ -16,8 +16,12 @@
  */
 package org.apache.accumulo.core.client.mapreduce;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.lang.reflect.InvocationTargetException;
@@ -77,13 +81,6 @@ import org.apache.accumulo.core.util.Tex
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -118,7 +115,7 @@ public abstract class InputFormatBase<K,
   private static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
   private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
   private static final String USERNAME = PREFIX + ".username";
-  private static final String PASSWORD_PATH = PREFIX + ".password";
+  private static final String PASSWORD = PREFIX + ".password";
   private static final String TABLE_NAME = PREFIX + ".tablename";
   private static final String AUTHORIZATIONS = PREFIX + ".authorizations";
   
@@ -217,28 +214,10 @@ public abstract class InputFormatBase<K,
     
     ArgumentChecker.notNull(user, passwd, table);
     conf.set(USERNAME, user);
+    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
     conf.set(TABLE_NAME, table);
     if (auths != null && !auths.isEmpty())
       conf.set(AUTHORIZATIONS, auths.serialize());
-    
-    try {
-      FileSystem fs = FileSystem.get(conf);
-      Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis() + ".pw");
-      conf.set(PASSWORD_PATH, file.toString());
-      FSDataOutputStream fos = fs.create(file, false);
-      fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
-      fs.deleteOnExit(file);
-      
-      byte[] encodedPw = Base64.encodeBase64(passwd);
-      fos.writeInt(encodedPw.length);
-      fos.write(encodedPw);
-      fos.close();
-      
-      DistributedCache.addCacheFile(file.toUri(), conf);
-    } catch (IOException ioe) {
-      throw new RuntimeException(ioe);
-    }
-
   }
   
   /**
@@ -306,24 +285,17 @@ public abstract class InputFormatBase<K,
    */
   public static void setRanges(Configuration conf, Collection<Range> ranges) {
     ArgumentChecker.notNull(ranges);
+    ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
     try {
-      FileSystem fs = FileSystem.get(conf);
-      Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis() + ".ranges");
-      conf.set(RANGES, file.toString());
-      FSDataOutputStream fos = fs.create(file, false);
-      fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
-      fs.deleteOnExit(file);
-      
-      fos.writeInt(ranges.size());
       for (Range r : ranges) {
-        r.write(fos);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        r.write(new DataOutputStream(baos));
+        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
       }
-      fos.close();
-      
-      DistributedCache.addCacheFile(file.toUri(), conf);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to write ranges to file", e);
+    } catch (IOException ex) {
+      throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
     }
+    conf.setStrings(RANGES, rangeStrings.toArray(new String[0]));
   }
   
   /**
@@ -660,31 +632,26 @@ public abstract class InputFormatBase<K,
   }
   
   /**
-   * @throws IOException
+   * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a
+   * string, and is not intended to be secure.
+   * 
    * @deprecated Use {@link #getPassword(Configuration)} instead
    */
-  protected static byte[] getPassword(JobContext job) throws IOException {
+  protected static byte[] getPassword(JobContext job) {
     return getPassword(job.getConfiguration());
   }
   
   /**
+   * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
+   * provide a charset safe conversion to a string, and is not intended to be secure.
+   * 
    * @param conf
    *          the Hadoop configuration object
    * @return the BASE64-encoded password
-   * @throws IOException
    * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
    */
-  protected static byte[] getPassword(Configuration conf) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    Path file = new Path(conf.get(PASSWORD_PATH));
-    
-    FSDataInputStream fdis = fs.open(file);
-    int length = fdis.readInt();
-    byte[] encodedPassword = new byte[length];
-    fdis.read(encodedPassword);
-    fdis.close();
-    
-    return Base64.decodeBase64(encodedPassword);
+  protected static byte[] getPassword(Configuration conf) {
+    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
   }
   
   /**
@@ -751,7 +718,7 @@ public abstract class InputFormatBase<K,
   /**
    * @deprecated Use {@link #getTabletLocator(Configuration)} instead
    */
-  protected static TabletLocator getTabletLocator(JobContext job) throws TableNotFoundException, IOException {
+  protected static TabletLocator getTabletLocator(JobContext job) throws TableNotFoundException {
     return getTabletLocator(job.getConfiguration());
   }
   
@@ -763,10 +730,8 @@ public abstract class InputFormatBase<K,
    * @return an accumulo tablet locator
    * @throws TableNotFoundException
    *           if the table name set on the configuration doesn't exist
-   * @throws IOException
-   *           if the input format is unable to read the password file from the FileSystem
    */
-  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException {
+  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
     if (conf.getBoolean(MOCK, false))
       return new MockTabletLocator();
     Instance instance = getInstance(conf);
@@ -796,21 +761,12 @@ public abstract class InputFormatBase<K,
    */
   protected static List<Range> getRanges(Configuration conf) throws IOException {
     ArrayList<Range> ranges = new ArrayList<Range>();
-    FileSystem fs = FileSystem.get(conf);
-    String rangePath = conf.get(RANGES);
-    if (rangePath == null)
-      return ranges;
-    Path file = new Path(rangePath);
-    
-    FSDataInputStream fdis = fs.open(file);
-    int numRanges = fdis.readInt();
-    while (numRanges > 0) {
-      Range r = new Range();
-      r.readFields(fdis);
-      ranges.add(r);
-      numRanges--;
+    for (String rangeString : conf.getStringCollection(RANGES)) {
+      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
+      Range range = new Range();
+      range.readFields(new DataInputStream(bais));
+      ranges.add(range);
     }
-    fdis.close();
     return ranges;
   }
   
@@ -1227,7 +1183,7 @@ public abstract class InputFormatBase<K,
   }
   
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext job, String tableName, List<Range> ranges) throws TableNotFoundException,
-      AccumuloException, AccumuloSecurityException, IOException {
+      AccumuloException, AccumuloSecurityException {
     
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
 

Modified: accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1400976&r1=1400975&r2=1400976&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java (original)
+++ accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java Mon Oct 22 18:08:17 2012
@@ -20,8 +20,6 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -34,7 +32,6 @@ import org.apache.accumulo.core.client.m
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.security.Authorizations;
@@ -104,24 +101,6 @@ public class AccumuloInputFormatTest {
     String iterators = conf.get("AccumuloInputFormat.iterators");
     assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators);
   }
-
-  static abstract class GetRanges<K, V> extends InputFormatBase<K,V> {
-    public static List<Range> getRanges(Configuration conf) throws IOException {
-      return InputFormatBase.getRanges(conf);
-    }
-  };
-
-  @Test
-  public void testSetRanges() throws IOException {
-    JobContext job = new JobContext(new Configuration(), new JobID());
-    List<Range> ranges = new ArrayList<Range>();
-    for (int i = 0; i < 100000; i++) {
-      ranges.add(new Range(new Text(String.format("%05x", i))));
-    }
-    AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
-    List<Range> ranges2 = GetRanges.getRanges(job.getConfiguration());
-    assertEquals(ranges, ranges2);
-  }
   
   @Test
   public void testAddIterator() {
@@ -298,17 +277,14 @@ public class AccumuloInputFormatTest {
   
   static class TestMapper extends Mapper<Key,Value,Key,Value> {
     Key key = null;
-    int first = 0;
     int count = 0;
     
     @Override
     protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
       if (key != null)
         assertEquals(key.getRow().toString(), new String(v.get()));
-      else
-        first = Integer.parseInt(k.getRow().toString(), 16) - 1;
-      assertEquals(k.getRow(), new Text(String.format("%09x", first + count + 1)));
-      assertEquals(new String(v.get()), String.format("%09x", first + count));
+      assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+      assertEquals(new String(v.get()), String.format("%09x", count));
       key = new Key(k);
       count++;
     }
@@ -333,14 +309,10 @@ public class AccumuloInputFormatTest {
     job.setNumReduceTasks(0);
     AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations());
     AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
-    HashSet<Range> ranges = new HashSet<Range>();
-    ranges.add(new Range("000000000", "000000010"));
-    ranges.add(new Range("000000100", "000000110"));
-    AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
     
     AccumuloInputFormat input = new AccumuloInputFormat();
     List<InputSplit> splits = input.getSplits(job);
-    assertEquals(splits.size(), 2);
+    assertEquals(splits.size(), 1);
     
     TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
     for (InputSplit split : splits) {
@@ -350,7 +322,6 @@ public class AccumuloInputFormatTest {
       Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, split);
       reader.initialize(split, context);
       mapper.run(context);
-      assertEquals(mapper.count, 16);
     }
   }
   
@@ -371,9 +342,7 @@ public class AccumuloInputFormatTest {
     AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2", new Authorizations());
     AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
     AccumuloInputFormat input = new AccumuloInputFormat();
-    List<InputSplit> splits = input.getSplits(job);
-    assertEquals(splits.size(), 1);
-    RangeInputSplit ris = (RangeInputSplit) splits.get(0);
+    RangeInputSplit ris = new RangeInputSplit();
     TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
     RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
     rr.initialize(ris, tac);
@@ -383,6 +352,5 @@ public class AccumuloInputFormatTest {
     while (rr.nextKeyValue()) {
       mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), context);
     }
-    assertEquals(mapper.count, 100);
   }
 }