You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2011/12/23 18:53:13 UTC
svn commit: r1222766 [1/3] - in /incubator/accumulo/trunk:
src/core/src/main/java/org/apache/accumulo/core/
src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/
src/core/src/main/java/org/apache/accumulo/core/file/
src/core/src/main/java...
Author: vines
Date: Fri Dec 23 17:53:12 2011
New Revision: 1222766
URL: http://svn.apache.org/viewvc?rev=1222766&view=rev
Log:
Accumulo-227 - switch in memory counts to column counts from mutation counts. It is an int, not a long, so there is a possibility something could break in the distant future. But this may get changed with a larger change to the functioning of the system.
Accumulo-149 - first phase, MyMapFile.Writer and MySequenceFile.Writer are purged.
Added:
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java (with props)
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java (contents, props changed)
- copied, changed from r1215244, incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java
Removed:
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java
Modified:
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/Constants.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MyMapFile.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MySequenceFile.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/data/MapFileTest.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java
incubator/accumulo/trunk/test/system/auto/simple/compaction.py
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/Constants.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/Constants.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/Constants.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/Constants.java Fri Dec 23 17:53:12 2011
@@ -155,6 +155,7 @@ public class Constants {
public static final String CORE_PACKAGE_NAME = "org.apache.accumulo.core";
public static final String OLD_PACKAGE_NAME = "cloudbase";
public static final String VALID_TABLE_NAME_REGEX = "^\\w+$";
+ public static final String MAPFILE_EXTENSION = "map";
public static String getBaseDir(AccumuloConfiguration conf) {
return conf.get(Property.INSTANCE_DFS_DIR);
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java Fri Dec 23 17:53:12 2011
@@ -6262,6 +6262,8 @@ import org.slf4j.LoggerFactory;
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -7819,6 +7821,8 @@ import org.slf4j.LoggerFactory;
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -14033,6 +14037,8 @@ import org.slf4j.LoggerFactory;
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -15162,6 +15168,8 @@ import org.slf4j.LoggerFactory;
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -21123,6 +21131,8 @@ import org.slf4j.LoggerFactory;
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java Fri Dec 23 17:53:12 2011
@@ -21,13 +21,13 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
import org.apache.accumulo.core.file.map.MapFileOperations;
-import org.apache.accumulo.core.file.map.MyMapFile;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.file.rfile.RFileOperations;
import org.apache.hadoop.conf.Configuration;
@@ -41,7 +41,7 @@ class DispatchingFileFactory extends Fil
Path p = new Path(file);
String name = p.getName();
- if (name.startsWith(MyMapFile.EXTENSION + "_")) {
+ if (name.startsWith(Constants.MAPFILE_EXTENSION + "_")) {
return new MapFileOperations();
}
@@ -53,7 +53,7 @@ class DispatchingFileFactory extends Fil
String extension = sp[1];
- if (extension.equals(MyMapFile.EXTENSION) || extension.equals(MyMapFile.EXTENSION + "_tmp")) {
+ if (extension.equals(Constants.MAPFILE_EXTENSION) || extension.equals(Constants.MAPFILE_EXTENSION + "_tmp")) {
return new MapFileOperations();
} else if (extension.equals(RFile.EXTENSION) || extension.equals(RFile.EXTENSION + "_tmp")) {
return new RFileOperations();
@@ -140,7 +140,7 @@ class DispatchingFileFactory extends Fil
public abstract class FileOperations {
- private static final HashSet<String> validExtensions = new HashSet<String>(Arrays.asList(MyMapFile.EXTENSION, RFile.EXTENSION));
+ private static final HashSet<String> validExtensions = new HashSet<String>(Arrays.asList(Constants.MAPFILE_EXTENSION, RFile.EXTENSION));
public static Set<String> getValidExtensions() {
return validExtensions;
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java Fri Dec 23 17:53:12 2011
@@ -25,10 +25,10 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.Map.Entry;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -37,8 +37,8 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.map.MyMapFile;
-import org.apache.accumulo.core.file.map.MySequenceFile;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.MultiIterator;
import org.apache.accumulo.core.util.CachedConfiguration;
@@ -46,7 +46,6 @@ import org.apache.accumulo.core.util.Loc
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -120,13 +119,13 @@ public class FileUtil {
start = end;
- String newMapFile = String.format("%s/" + MyMapFile.EXTENSION + "_%04d", newDir, count++);
+ String newMapFile = String.format("%s/" + RFile.EXTENSION + "_%04d", newDir, count++);
fs.mkdirs(new Path(newMapFile));
- Path outFile = new Path(String.format("%s/index", newMapFile));
+ String outFile = String.format("%s/index", newMapFile);
outFiles.add(newMapFile);
- MySequenceFile.Writer writer = MySequenceFile.createWriter(fs, conf, outFile, Key.class, LongWritable.class, MySequenceFile.CompressionType.BLOCK);
+ FileSKVWriter writer = new RFileOperations().openWriter(outFile, fs, conf, acuConf);
List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(inFiles.size());
FileSKVIterator reader = null;
@@ -145,7 +144,7 @@ public class FileUtil {
boolean lteEndRow = endRow == null || key.compareRow(endRow) <= 0;
if (gtPrevEndRow && lteEndRow)
- writer.append(key, new LongWritable(0));
+ writer.append(key, new Value(new byte[0]));
if (!lteEndRow)
break;
@@ -181,7 +180,7 @@ public class FileUtil {
return reduceFiles(acuConf, conf, fs, prevEndRow, endRow, outFiles, maxFiles, tmpDir, pass + 1);
}
-
+
public static SortedMap<Double,Key> findMidPoint(FileSystem fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<String> mapFiles,
double minSplit) throws IOException {
return findMidPoint(fs, acuConf, prevEndRow, endRow, mapFiles, minSplit, true);
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java Fri Dec 23 17:53:12 2011
@@ -17,7 +17,6 @@
package org.apache.accumulo.core.file.map;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -146,44 +145,9 @@ public class MapFileOperations extends F
@Override
public FileSKVWriter openWriter(final String file, final FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- final MyMapFile.Writer mfw = MapFileUtil.openMapFileWriter(acuconf, conf, fs, file);
- return new FileSKVWriter() {
-
- boolean secondCall = false;
-
- @Override
- public void append(Key key, Value value) throws IOException {
- mfw.append(new Key(key), value);
- }
-
- @Override
- public void close() throws IOException {
- mfw.close();
- }
-
- @Override
- public DataOutputStream createMetaStore(String name) throws IOException {
- return fs.create(new Path(file, name), false);
- }
-
- @Override
- public void startDefaultLocalityGroup() throws IOException {
- if (secondCall)
- throw new IllegalStateException("Start default locality group called twice");
-
- secondCall = true;
- }
-
- @Override
- public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean supportsLocalityGroups() {
- return false;
- }
- };
+
+ throw new UnsupportedOperationException();
+
}
@Override
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java Fri Dec 23 17:53:12 2011
@@ -19,47 +19,17 @@ package org.apache.accumulo.core.file.ma
import java.io.IOException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
public class MapFileUtil {
- private static final Logger log = Logger.getLogger(MapFileUtil.class);
-
- public static boolean attemptToFixMapFile(Configuration conf, FileSystem fs, String dirName) {
- boolean fixed = true;
- try {
- log.info("Attempting to fix mapfile " + dirName);
- Path indexFile = new Path(dirName + "/" + MyMapFile.INDEX_FILE_NAME);
- if (fs.exists(indexFile) && fs.getFileStatus(indexFile).getLen() == 0) {
- log.info("Deleting 0 length index file " + indexFile);
- fs.delete(indexFile, false);
- }
-
- MyMapFile.fix(fs, new Path(dirName), Key.class, Value.class, false, conf);
- } catch (Exception e) {
- log.error("Failed to fix mapfile " + dirName, e);
- fixed = false;
- }
-
- return fixed;
- }
-
public static MyMapFile.Reader openMapFile(AccumuloConfiguration acuconf, FileSystem fs, String dirName, Configuration conf) throws IOException {
MyMapFile.Reader mfr = null;
try {
mfr = new MyMapFile.Reader(fs, dirName, conf);
return mfr;
} catch (IOException e) {
- if (attemptToFixMapFile(conf, fs, dirName)) {
- log.info("Fixed mapfile " + dirName);
- mfr = new MyMapFile.Reader(fs, dirName, conf);
- return mfr;
- }
throw e;
}
}
@@ -71,46 +41,7 @@ public class MapFileUtil {
index = new MySequenceFile.Reader(fs, indexPath, conf);
return index;
} catch (IOException e) {
- if (attemptToFixMapFile(conf, fs, mapFile.toString())) {
- log.info("Fixed mapfile " + mapFile);
- index = new MySequenceFile.Reader(fs, indexPath, conf);
- return index;
- }
throw e;
}
}
-
- public static MyMapFile.Writer openMapFileWriter(AccumuloConfiguration acuTableConf, Configuration conf, FileSystem fs, String dirname) throws IOException {
- MyMapFile.Writer mfw = null;
- int hbs = conf.getInt("io.seqfile.compress.blocksize", -1);
- int hrep = conf.getInt("dfs.replication", -1);
-
- // dfs.replication
-
- Configuration newConf = null;
-
- int tbs = (int) acuTableConf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
- int trep = acuTableConf.getCount(Property.TABLE_FILE_REPLICATION);
-
- if (hbs != tbs) {
- newConf = new Configuration(conf);
- newConf.setInt("io.seqfile.compress.blocksize", tbs);
- }
-
- if (fs.exists(new Path(dirname)))
- log.error("Map file " + dirname + " already exists", new Exception());
-
- if (newConf != null)
- conf = newConf;
-
- mfw = new MyMapFile.Writer(conf, fs, dirname, Key.class, Value.class, MySequenceFile.CompressionType.BLOCK);
-
- if (trep > 0 && trep != hrep) {
- // tried to set dfs.replication property on conf obj, however this was ignored, so have to manually set the prop
- fs.setReplication(new Path(dirname + "/" + MyMapFile.DATA_FILE_NAME), (short) trep);
- fs.setReplication(new Path(dirname + "/" + MyMapFile.INDEX_FILE_NAME), (short) trep);
- }
-
- return mfw;
- }
}
Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MyMapFile.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MyMapFile.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MyMapFile.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MyMapFile.java Fri Dec 23 17:53:12 2011
@@ -34,7 +34,6 @@ import org.apache.accumulo.core.data.Ran
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.NoSuchMetaStoreException;
-import org.apache.accumulo.core.file.map.MySequenceFile.CompressionType;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -45,17 +44,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableName;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
/**
@@ -75,8 +68,6 @@ import org.apache.log4j.Logger;
@SuppressWarnings({"rawtypes", "unchecked"})
public class MyMapFile {
- public static final String EXTENSION = "map";
-
private static final Logger log = Logger.getLogger(MyMapFile.class);
/** The name of the index file. */
@@ -95,161 +86,6 @@ public class MyMapFile {
protected MyMapFile() {} // no public ctor
- /** Writes a new map. */
- public static class Writer {
- private MySequenceFile.Writer data;
- private MySequenceFile.Writer index;
-
- final private static String INDEX_INTERVAL = "io.map.index.interval";
- private int indexInterval = 128;
-
- private long size;
- private LongWritable position = new LongWritable();
-
- // the following fields are used only for checking key order
- private WritableComparator comparator;
- private DataInputBuffer inBuf = new DataInputBuffer();
- private DataOutputBuffer outBuf = new DataOutputBuffer();
- private WritableComparable lastKey;
-
- /** Create the named map for keys of the named class. */
- public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass) throws IOException {
- this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, MySequenceFile.getCompressionType(conf));
- }
-
- /** Create the named map for keys of the named class. */
- public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, CompressionType compress, Progressable progress)
- throws IOException {
- this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress, progress);
- }
-
- /** Create the named map for keys of the named class. */
- public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, CompressionType compress, CompressionCodec codec,
- Progressable progress) throws IOException {
- this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress, codec, progress);
- }
-
- /** Create the named map for keys of the named class. */
- public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, CompressionType compressionType) throws IOException {
- this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compressionType);
- }
-
- /** Create the named map using the named key comparator. */
- public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass) throws IOException {
- this(conf, fs, dirName, comparator, valClass, MySequenceFile.getCompressionType(conf));
- }
-
- /** Create the named map using the named key comparator. */
- public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, MySequenceFile.CompressionType compress)
- throws IOException {
- this(conf, fs, dirName, comparator, valClass, compress, null);
- }
-
- /** Create the named map using the named key comparator. */
- public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, MySequenceFile.CompressionType compress,
- Progressable progress) throws IOException {
- this(conf, fs, dirName, comparator, valClass, compress, new DefaultCodec(), progress);
- }
-
- /** Create the named map using the named key comparator. */
- public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, MySequenceFile.CompressionType compress,
- CompressionCodec codec, Progressable progress) throws IOException {
-
- // LOG.debug("Opening map file "+dirName+" for write");
-
- this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
-
- this.comparator = comparator;
- this.lastKey = comparator.newKey();
-
- Path dir = new Path(dirName);
- if (!fs.mkdirs(dir)) {
- throw new IOException("Mkdirs failed to create directory " + dir.toString());
- }
- Path dataFile = new Path(dir, DATA_FILE_NAME);
- Path indexFile = new Path(dir, INDEX_FILE_NAME);
-
- Class keyClass = comparator.getKeyClass();
- this.data = MySequenceFile.createWriter(fs, conf, dataFile, keyClass, valClass, compress, codec, progress);
- this.index = MySequenceFile.createWriter(fs, conf, indexFile, keyClass, LongWritable.class, CompressionType.BLOCK, progress);
- }
-
- /** The number of entries that are added before an index entry is added. */
- public int getIndexInterval() {
- return indexInterval;
- }
-
- /**
- * Sets the index interval.
- *
- * @see #getIndexInterval()
- */
- public void setIndexInterval(int interval) {
- indexInterval = interval;
- }
-
- /**
- * Sets the index interval and stores it in conf
- *
- * @see #getIndexInterval()
- */
- public static void setIndexInterval(Configuration conf, int interval) {
- conf.setInt(INDEX_INTERVAL, interval);
- }
-
- /** Close the map. */
- public synchronized void close() throws IOException {
-
- // LOG.debug("Closing map file "+myDir+" for write");
-
- data.close();
- index.close();
- }
-
- /**
- * Append a key/value pair to the map. The key must be greater or equal to the previous key added to the map.
- */
- public synchronized void append(WritableComparable key, Writable val) throws IOException {
-
- checkKey(key);
-
- /*******************************************************************
- * Instead of storing index values for every 128th key that all point back to the same compressed block, we can store one key for each compressed block.
- */
- if (data.isBlockCompressed()) {
- // add an index entry when the data size changes, indicating a
- // new compressed block is added
- // also add an index entry for the first value
- if (size == 0 || position.get() != data.getLength()) {
- position.set(data.getLength());
- index.append(key, position);
- }
- } else {
- if (size % indexInterval == 0) { // add an index entry
- position.set(data.getLength()); // point to current eof
- index.append(key, position);
- }
- }
-
- data.append(key, val); // append key/value to data
- size++;
- }
-
- private void checkKey(WritableComparable key) throws IOException {
- // check that keys are well-ordered
- if (size != 0 && comparator.compare(lastKey, key) > 0)
- throw new IOException("key out of order: " + key + " after " + lastKey);
-
- // update lastKey with a copy of key by writing and reading
- outBuf.reset();
- key.write(outBuf); // write new key
-
- inBuf.reset(outBuf.getData(), outBuf.getLength());
- lastKey.readFields(inBuf); // read into lastKey
- }
-
- }
-
/** Provide access to an existing map. */
public static class Reader implements FileSKVIterator {
@@ -916,132 +752,4 @@ public class MyMapFile {
return indexInterval;
}
- /**
- * This method attempts to fix a corrupt MapFile by re-creating its index.
- *
- * Code copied from hadoop (0.18.0) because it was broken there. This is a fixed version. Do not want to loose changes if MyMapFile is updated.
- *
- * @param fs
- * filesystem
- * @param dir
- * directory containing the MapFile data and index
- * @param keyClass
- * key class (has to be a subclass of Writable)
- * @param valueClass
- * value class (has to be a subclass of Writable)
- * @param dryrun
- * do not perform any changes, just report what needs to be done
- * @return number of valid entries in this MapFile, or -1 if no fixing was needed
- */
- public static long fix(FileSystem fs, Path dir, Class<? extends WritableComparable> keyClass, Class<? extends Writable> valueClass, boolean dryrun,
- Configuration conf) throws Exception {
- String dr = (dryrun ? "[DRY RUN ] " : "");
- Path data = new Path(dir, DATA_FILE_NAME);
- Path index = new Path(dir, INDEX_FILE_NAME);
- int indexInterval = getIndexInterval();
- if (!fs.exists(data)) {
- // there's nothing we can do to fix this!
- throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
- }
- if (fs.exists(index)) {
- // no fixing needed
- return -1;
- }
- MySequenceFile.Reader dataReader = null;
- MySequenceFile.Writer indexWriter = null;
- long cnt = 0L;
- try {
- dataReader = new MySequenceFile.Reader(fs, data, conf);
- if (!dataReader.getKeyClass().equals(keyClass)) {
- throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() + ", got " + dataReader.getKeyClass().getName());
- }
- if (!dataReader.getValueClass().equals(valueClass)) {
- throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() + ", got " + dataReader.getValueClass().getName());
- }
- Writable key = ReflectionUtils.newInstance(keyClass, conf);
- Writable value = ReflectionUtils.newInstance(valueClass, conf);
- if (!dryrun)
- indexWriter = MySequenceFile.createWriter(fs, conf, index, keyClass, LongWritable.class);
- long currentPos = 0L;
- long lastPos = 0L;
-
- LongWritable position = new LongWritable();
- lastPos = dataReader.getPosition();
-
- boolean blockCompressed = dataReader.isBlockCompressed();
-
- if (!blockCompressed) {
- currentPos = lastPos;
- }
-
- while (dataReader.next(key, value)) {
- if (blockCompressed) {
- if (cnt == 0) {
- currentPos = dataReader.getPosition();
- } else {
- long pos = dataReader.getPosition();
- if (pos != currentPos) {
- lastPos = currentPos;
- currentPos = pos;
- }
- }
- // write an index entry at position 0 and whenever the position changes
- if (cnt == 0 || position.get() != lastPos) {
- position.set(lastPos);
- if (!dryrun)
- indexWriter.append(key, position);
- }
- } else {
- if (cnt % indexInterval == 0) {
- position.set(currentPos);
- if (!dryrun)
- indexWriter.append(key, position);
- }
- long pos = dataReader.getPosition();
- if (pos != currentPos) {
- lastPos = currentPos;
- currentPos = pos;
- }
- }
- cnt++;
-
- }
- } catch (Throwable t) {
- // truncated data file. swallow it.
- log.error("Exception when trying to fix map file " + dir, t);
- } finally {
- if (dataReader != null)
- dataReader.close();
- if (indexWriter != null)
- indexWriter.close();
- }
- return cnt;
- }
-
- public static void main(String[] args) throws Exception {
- String usage = "Usage: MapFile inFile outFile";
-
- if (args.length != 2) {
- System.err.println(usage);
- System.exit(1);
- }
-
- String in = args[0];
- String out = args[1];
-
- Configuration conf = CachedConfiguration.getInstance();
- FileSystem fs = FileSystem.getLocal(conf);
- MyMapFile.Reader reader = new MyMapFile.Reader(fs, in, conf);
- MyMapFile.Writer writer = new MyMapFile.Writer(conf, fs, out, reader.getKeyClass(), reader.getValueClass());
-
- WritableComparable key = (WritableComparable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-
- while (reader.next(key, value))
- // copy all entries
- writer.append(key, value);
-
- writer.close();
- }
-
}