You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/10/22 20:08:18 UTC
svn commit: r1400976 - in /accumulo/branches/1.4/src/core/src:
main/java/org/apache/accumulo/core/client/mapreduce/
test/java/org/apache/accumulo/core/client/mapreduce/
Author: kturner
Date: Mon Oct 22 18:08:17 2012
New Revision: 1400976
URL: http://svn.apache.org/viewvc?rev=1400976&view=rev
Log:
ACCUMULO-826 ACCUMULO-507 reverted revisions 1397700,1382923,1339308,1339223,1336322. These changes caused map reduce jobs to fail if the process that started the job exited.
Modified:
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1400976&r1=1400975&r2=1400976&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java (original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java Mon Oct 22 18:08:17 2012
@@ -38,13 +38,6 @@ import org.apache.accumulo.core.security
import org.apache.accumulo.core.util.ArgumentChecker;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -75,7 +68,7 @@ public class AccumuloOutputFormat extend
private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
private static final String USERNAME = PREFIX + ".username";
- private static final String PASSWORD_PATH = PREFIX + ".password";
+ private static final String PASSWORD = PREFIX + ".password";
private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable";
private static final String INSTANCE_NAME = PREFIX + ".instanceName";
@@ -135,28 +128,10 @@ public class AccumuloOutputFormat extend
ArgumentChecker.notNull(user, passwd);
conf.set(USERNAME, user);
+ conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
conf.setBoolean(CREATETABLES, createTables);
if (defaultTable != null)
conf.set(DEFAULT_TABLE_NAME, defaultTable);
-
- try {
- FileSystem fs = FileSystem.get(conf);
- Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis() + ".pw");
- conf.set(PASSWORD_PATH, file.toString());
- FSDataOutputStream fos = fs.create(file, false);
- fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
- fs.deleteOnExit(file);
-
- byte[] encodedPw = Base64.encodeBase64(passwd);
- fos.writeInt(encodedPw.length);
- fos.write(encodedPw);
- fos.close();
-
- DistributedCache.addCacheFile(file.toUri(), conf);
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
-
}
/**
@@ -257,28 +232,21 @@ public class AccumuloOutputFormat extend
}
/**
- * @throws IOException
+ * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a
+ * string, and is not intended to be secure.
*
* @deprecated Use {@link #getPassword(Configuration)} instead
*/
- protected static byte[] getPassword(JobContext job) throws IOException {
+ protected static byte[] getPassword(JobContext job) {
return getPassword(job.getConfiguration());
}
/**
- * @throws IOException
+ * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a
+ * string, and is not intended to be secure.
*/
- protected static byte[] getPassword(Configuration conf) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- Path file = new Path(conf.get(PASSWORD_PATH));
-
- FSDataInputStream fdis = fs.open(file);
- int length = fdis.readInt();
- byte[] encodedPassword = new byte[length];
- fdis.read(encodedPassword);
- fdis.close();
-
- return Base64.decodeBase64(encodedPassword);
+ protected static byte[] getPassword(Configuration conf) {
+ return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
}
/**
@@ -386,7 +354,7 @@ public class AccumuloOutputFormat extend
private Connector conn;
- AccumuloRecordWriter(TaskAttemptContext attempt) throws AccumuloException, AccumuloSecurityException, IOException {
+ AccumuloRecordWriter(TaskAttemptContext attempt) throws AccumuloException, AccumuloSecurityException {
Level l = getLogLevel(attempt);
if (l != null)
log.setLevel(getLogLevel(attempt));
Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1400976&r1=1400975&r2=1400976&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Mon Oct 22 18:08:17 2012
@@ -16,8 +16,12 @@
*/
package org.apache.accumulo.core.client.mapreduce;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
@@ -77,13 +81,6 @@ import org.apache.accumulo.core.util.Tex
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -118,7 +115,7 @@ public abstract class InputFormatBase<K,
private static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
private static final String USERNAME = PREFIX + ".username";
- private static final String PASSWORD_PATH = PREFIX + ".password";
+ private static final String PASSWORD = PREFIX + ".password";
private static final String TABLE_NAME = PREFIX + ".tablename";
private static final String AUTHORIZATIONS = PREFIX + ".authorizations";
@@ -217,28 +214,10 @@ public abstract class InputFormatBase<K,
ArgumentChecker.notNull(user, passwd, table);
conf.set(USERNAME, user);
+ conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
conf.set(TABLE_NAME, table);
if (auths != null && !auths.isEmpty())
conf.set(AUTHORIZATIONS, auths.serialize());
-
- try {
- FileSystem fs = FileSystem.get(conf);
- Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis() + ".pw");
- conf.set(PASSWORD_PATH, file.toString());
- FSDataOutputStream fos = fs.create(file, false);
- fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
- fs.deleteOnExit(file);
-
- byte[] encodedPw = Base64.encodeBase64(passwd);
- fos.writeInt(encodedPw.length);
- fos.write(encodedPw);
- fos.close();
-
- DistributedCache.addCacheFile(file.toUri(), conf);
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
-
}
/**
@@ -306,24 +285,17 @@ public abstract class InputFormatBase<K,
*/
public static void setRanges(Configuration conf, Collection<Range> ranges) {
ArgumentChecker.notNull(ranges);
+ ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
try {
- FileSystem fs = FileSystem.get(conf);
- Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis() + ".ranges");
- conf.set(RANGES, file.toString());
- FSDataOutputStream fos = fs.create(file, false);
- fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
- fs.deleteOnExit(file);
-
- fos.writeInt(ranges.size());
for (Range r : ranges) {
- r.write(fos);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ r.write(new DataOutputStream(baos));
+ rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
}
- fos.close();
-
- DistributedCache.addCacheFile(file.toUri(), conf);
- } catch (IOException e) {
- throw new RuntimeException("Unable to write ranges to file", e);
+ } catch (IOException ex) {
+ throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
}
+ conf.setStrings(RANGES, rangeStrings.toArray(new String[0]));
}
/**
@@ -660,31 +632,26 @@ public abstract class InputFormatBase<K,
}
/**
- * @throws IOException
+ * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a
+ * string, and is not intended to be secure.
+ *
* @deprecated Use {@link #getPassword(Configuration)} instead
*/
- protected static byte[] getPassword(JobContext job) throws IOException {
+ protected static byte[] getPassword(JobContext job) {
return getPassword(job.getConfiguration());
}
/**
+ * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
+ * provide a charset safe conversion to a string, and is not intended to be secure.
+ *
* @param conf
* the Hadoop configuration object
* @return the BASE64-encoded password
- * @throws IOException
* @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
*/
- protected static byte[] getPassword(Configuration conf) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- Path file = new Path(conf.get(PASSWORD_PATH));
-
- FSDataInputStream fdis = fs.open(file);
- int length = fdis.readInt();
- byte[] encodedPassword = new byte[length];
- fdis.read(encodedPassword);
- fdis.close();
-
- return Base64.decodeBase64(encodedPassword);
+ protected static byte[] getPassword(Configuration conf) {
+ return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
}
/**
@@ -751,7 +718,7 @@ public abstract class InputFormatBase<K,
/**
* @deprecated Use {@link #getTabletLocator(Configuration)} instead
*/
- protected static TabletLocator getTabletLocator(JobContext job) throws TableNotFoundException, IOException {
+ protected static TabletLocator getTabletLocator(JobContext job) throws TableNotFoundException {
return getTabletLocator(job.getConfiguration());
}
@@ -763,10 +730,8 @@ public abstract class InputFormatBase<K,
* @return an accumulo tablet locator
* @throws TableNotFoundException
* if the table name set on the configuration doesn't exist
- * @throws IOException
- * if the input format is unable to read the password file from the FileSystem
*/
- protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException {
+ protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
if (conf.getBoolean(MOCK, false))
return new MockTabletLocator();
Instance instance = getInstance(conf);
@@ -796,21 +761,12 @@ public abstract class InputFormatBase<K,
*/
protected static List<Range> getRanges(Configuration conf) throws IOException {
ArrayList<Range> ranges = new ArrayList<Range>();
- FileSystem fs = FileSystem.get(conf);
- String rangePath = conf.get(RANGES);
- if (rangePath == null)
- return ranges;
- Path file = new Path(rangePath);
-
- FSDataInputStream fdis = fs.open(file);
- int numRanges = fdis.readInt();
- while (numRanges > 0) {
- Range r = new Range();
- r.readFields(fdis);
- ranges.add(r);
- numRanges--;
+ for (String rangeString : conf.getStringCollection(RANGES)) {
+ ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
+ Range range = new Range();
+ range.readFields(new DataInputStream(bais));
+ ranges.add(range);
}
- fdis.close();
return ranges;
}
@@ -1227,7 +1183,7 @@ public abstract class InputFormatBase<K,
}
Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext job, String tableName, List<Range> ranges) throws TableNotFoundException,
- AccumuloException, AccumuloSecurityException, IOException {
+ AccumuloException, AccumuloSecurityException {
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
Modified: accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1400976&r1=1400975&r2=1400976&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java (original)
+++ accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java Mon Oct 22 18:08:17 2012
@@ -20,8 +20,6 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import org.apache.accumulo.core.client.BatchWriter;
@@ -34,7 +32,6 @@ import org.apache.accumulo.core.client.m
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.security.Authorizations;
@@ -104,24 +101,6 @@ public class AccumuloInputFormatTest {
String iterators = conf.get("AccumuloInputFormat.iterators");
assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators);
}
-
- static abstract class GetRanges<K, V> extends InputFormatBase<K,V> {
- public static List<Range> getRanges(Configuration conf) throws IOException {
- return InputFormatBase.getRanges(conf);
- }
- };
-
- @Test
- public void testSetRanges() throws IOException {
- JobContext job = new JobContext(new Configuration(), new JobID());
- List<Range> ranges = new ArrayList<Range>();
- for (int i = 0; i < 100000; i++) {
- ranges.add(new Range(new Text(String.format("%05x", i))));
- }
- AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
- List<Range> ranges2 = GetRanges.getRanges(job.getConfiguration());
- assertEquals(ranges, ranges2);
- }
@Test
public void testAddIterator() {
@@ -298,17 +277,14 @@ public class AccumuloInputFormatTest {
static class TestMapper extends Mapper<Key,Value,Key,Value> {
Key key = null;
- int first = 0;
int count = 0;
@Override
protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
if (key != null)
assertEquals(key.getRow().toString(), new String(v.get()));
- else
- first = Integer.parseInt(k.getRow().toString(), 16) - 1;
- assertEquals(k.getRow(), new Text(String.format("%09x", first + count + 1)));
- assertEquals(new String(v.get()), String.format("%09x", first + count));
+ assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+ assertEquals(new String(v.get()), String.format("%09x", count));
key = new Key(k);
count++;
}
@@ -333,14 +309,10 @@ public class AccumuloInputFormatTest {
job.setNumReduceTasks(0);
AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations());
AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
- HashSet<Range> ranges = new HashSet<Range>();
- ranges.add(new Range("000000000", "000000010"));
- ranges.add(new Range("000000100", "000000110"));
- AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
AccumuloInputFormat input = new AccumuloInputFormat();
List<InputSplit> splits = input.getSplits(job);
- assertEquals(splits.size(), 2);
+ assertEquals(splits.size(), 1);
TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
for (InputSplit split : splits) {
@@ -350,7 +322,6 @@ public class AccumuloInputFormatTest {
Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, split);
reader.initialize(split, context);
mapper.run(context);
- assertEquals(mapper.count, 16);
}
}
@@ -371,9 +342,7 @@ public class AccumuloInputFormatTest {
AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2", new Authorizations());
AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
AccumuloInputFormat input = new AccumuloInputFormat();
- List<InputSplit> splits = input.getSplits(job);
- assertEquals(splits.size(), 1);
- RangeInputSplit ris = (RangeInputSplit) splits.get(0);
+ RangeInputSplit ris = new RangeInputSplit();
TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
rr.initialize(ris, tac);
@@ -383,6 +352,5 @@ public class AccumuloInputFormatTest {
while (rr.nextKeyValue()) {
mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), context);
}
- assertEquals(mapper.count, 100);
}
}