You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2011/12/23 18:53:13 UTC
svn commit: r1222766 [3/3] - in /incubator/accumulo/trunk:
src/core/src/main/java/org/apache/accumulo/core/
src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/
src/core/src/main/java/org/apache/accumulo/core/file/
src/core/src/main/java...
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java Fri Dec 23 17:53:12 2011
@@ -40,9 +40,9 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.file.map.MapFileOperations;
-import org.apache.accumulo.core.file.map.MyMapFile;
-import org.apache.accumulo.core.file.map.MySequenceFile;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SkippingIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -71,7 +71,7 @@ class MemKeyComparator implements Compar
if (cmp == 0) {
if (k1 instanceof MemKey)
if (k2 instanceof MemKey)
- cmp = ((MemKey) k2).mutationCount - ((MemKey) k1).mutationCount;
+ cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount;
else
cmp = 1;
else if (k2 instanceof MemKey)
@@ -84,22 +84,22 @@ class MemKeyComparator implements Compar
class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator {
- int maxMutationCount;
+ int kvCount;
- public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxMutationCount) {
+ public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxKVCount) {
setSource(source);
- this.maxMutationCount = maxMutationCount;
+ this.kvCount = maxKVCount;
}
@Override
protected void consume() throws IOException {
- while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).mutationCount > maxMutationCount)
+ while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount)
getSource().next();
}
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- return new PartialMutationSkippingIterator(getSource().deepCopy(env), maxMutationCount);
+ return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount);
}
@Override
@@ -109,6 +109,77 @@ class PartialMutationSkippingIterator ex
}
+class MemKeyConversionIterator extends SkippingIterator implements InterruptibleIterator {
+ MemKey currKey = null;
+ Value currVal = null;
+
+ public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source) {
+ super();
+ setSource(source);
+ }
+
+ public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source, MemKey startKey) {
+ this(source);
+ try {
+ if (currKey != null)
+ currKey = (MemKey) startKey.clone();
+ } catch (CloneNotSupportedException e) {
+ // MemKey is supported
+ }
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new MemKeyConversionIterator(getSource().deepCopy(env), currKey);
+ }
+
+ @Override
+ public Key getTopKey() {
+ return currKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return currVal;
+ }
+
+ private void getTopKeyVal() {
+ Key k = super.getTopKey();
+ Value v = super.getTopValue();
+ if (k instanceof MemKey || k == null) {
+ currKey = (MemKey) k;
+ currVal = v;
+ return;
+ }
+ currVal = new Value(v);
+ int mc = MemValue.splitKVCount(currVal);
+ currKey = new MemKey(k, mc);
+
+ }
+
+ public void next() throws IOException {
+ super.next();
+ getTopKeyVal();
+ }
+
+ @Override
+ protected void consume() throws IOException {
+ MemKey stopPoint = currKey;
+ if (hasTop())
+ getTopKeyVal();
+ if (stopPoint == null)
+ return;
+ while (getSource().hasTop() && currKey.compareTo(stopPoint) <= 0)
+ next();
+ }
+
+ @Override
+ public void setInterruptFlag(AtomicBoolean flag) {
+ ((InterruptibleIterator) getSource()).setInterruptFlag(flag);
+ }
+
+}
+
public class InMemoryMap {
MutationLog mutationLog;
@@ -152,7 +223,7 @@ public class InMemoryMap {
public long getMemoryUsed();
- public void mutate(List<Mutation> mutations, int mutationCount);
+ public void mutate(List<Mutation> mutations, int kvCount);
}
private static class DefaultMap implements SimpleMap {
@@ -203,15 +274,14 @@ public class InMemoryMap {
}
@Override
- public void mutate(List<Mutation> mutations, int mutationCount) {
+ public void mutate(List<Mutation> mutations, int kvCount) {
for (Mutation m : mutations) {
for (ColumnUpdate cvp : m.getUpdates()) {
Key newKey = new MemKey(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(), cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(),
- false, mutationCount);
+ false, kvCount++);
Value value = new Value(cvp.getValue());
put(newKey, value);
}
- mutationCount++;
}
}
@@ -253,22 +323,25 @@ public class InMemoryMap {
}
@Override
- public void mutate(List<Mutation> mutations, int mutationCount) {
- nativeMap.mutate(mutations, mutationCount);
+ public void mutate(List<Mutation> mutations, int kvCount) {
+ nativeMap.mutate(mutations, kvCount);
}
}
- private AtomicInteger nextMutationCount = new AtomicInteger(1);
- private AtomicInteger mutationCount = new AtomicInteger(0);
+ private AtomicInteger nextKVCount = new AtomicInteger(1);
+ private AtomicInteger kvCount = new AtomicInteger(0);
/**
* Applies changes to a row in the InMemoryMap
*
*/
public void mutate(List<Mutation> mutations) {
- int mc = nextMutationCount.getAndAdd(mutations.size());
+ int numKVs = 0;
+ for (int i = 0; i < mutations.size(); i++)
+ numKVs += mutations.get(i).size();
+ int kv = nextKVCount.getAndAdd(numKVs);
try {
- map.mutate(mutations, mc);
+ map.mutate(mutations, kv);
} finally {
synchronized (this) {
// Can not update mutationCount while writes that started before
@@ -277,14 +350,14 @@ public class InMemoryMap {
// a read may not see a successful write. Therefore writes must
// wait for writes that started before to finish.
- while (mutationCount.get() != mc - 1) {
+ while (kvCount.get() != kv - 1) {
try {
wait();
} catch (InterruptedException ex) {
// ignored
}
}
- mutationCount.set(mc + mutations.size() - 1);
+ kvCount.set(kv + numKVs - 1);
notifyAll();
}
}
@@ -357,8 +430,8 @@ public class InMemoryMap {
Configuration conf = CachedConfiguration.getInstance();
FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
- FileSKVIterator reader = new MapFileOperations.RangeIterator(new MyMapFile.Reader(fs, memDumpFile, conf));
-
+ FileSKVIterator reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
+
readers.add(reader);
iter = reader;
@@ -447,10 +520,10 @@ public class InMemoryMap {
if (deleted)
throw new IllegalStateException("Can not obtain iterator after map deleted");
- int mc = mutationCount.get();
+ int mc = kvCount.get();
MemoryDataSource mds = new MemoryDataSource();
SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource());
- MemoryIterator mi = new MemoryIterator(new ColumnFamilySkippingIterator(new PartialMutationSkippingIterator(ssi, mc)));
+ MemoryIterator mi = new MemoryIterator(new ColumnFamilySkippingIterator(new PartialMutationSkippingIterator(new MemKeyConversionIterator(ssi), mc)));
mi.setSSI(ssi);
mi.setMDS(mds);
activeIters.add(mi);
@@ -459,9 +532,9 @@ public class InMemoryMap {
public SortedKeyValueIterator<Key,Value> compactionIterator() {
- if (nextMutationCount.get() - 1 != mutationCount.get())
- throw new IllegalStateException("Memory map in unexpected state : nextMutationCount = " + nextMutationCount.get() + " mutationCount = "
- + mutationCount.get());
+ if (nextKVCount.get() - 1 != kvCount.get())
+ throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = "
+ + kvCount.get());
return new ColumnFamilySkippingIterator(map.skvIterator());
}
@@ -489,17 +562,21 @@ public class InMemoryMap {
Configuration conf = CachedConfiguration.getInstance();
FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
- String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + ".map";
+ String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION;
Configuration newConf = new Configuration(conf);
newConf.setInt("io.seqfile.compress.blocksize", 100000);
- MyMapFile.Writer out = new MyMapFile.Writer(newConf, fs, tmpFile, MemKey.class, Value.class, MySequenceFile.CompressionType.BLOCK);
+ FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration());
+ out.startDefaultLocalityGroup();
InterruptibleIterator iter = map.skvIterator();
iter.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
while (iter.hasTop() && activeIters.size() > 0) {
- out.append(iter.getTopKey(), iter.getTopValue());
+ // RFile does not support MemKey, so we move the kv count into the value only for the RFile.
+ // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written
+ Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount);
+ out.append(iter.getTopKey(), newValue);
iter.next();
}
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java Fri Dec 23 17:53:12 2011
@@ -24,25 +24,25 @@ import org.apache.accumulo.core.data.Key
class MemKey extends Key {
- int mutationCount;
+ int kvCount;
public MemKey(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean del, boolean copy, int mc) {
super(row, cf, cq, cv, ts, del, copy);
- this.mutationCount = mc;
+ this.kvCount = mc;
}
public MemKey() {
super();
- this.mutationCount = Integer.MAX_VALUE;
+ this.kvCount = Integer.MAX_VALUE;
}
public MemKey(Key key, int mc) {
super(key);
- this.mutationCount = mc;
+ this.kvCount = mc;
}
public String toString() {
- return super.toString() + " mc=" + mutationCount;
+ return super.toString() + " mc=" + kvCount;
}
@Override
@@ -53,13 +53,13 @@ class MemKey extends Key {
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
- out.writeInt(mutationCount);
+ out.writeInt(kvCount);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- mutationCount = in.readInt();
+ kvCount = in.readInt();
}
@Override
@@ -68,7 +68,7 @@ class MemKey extends Key {
int cmp = super.compareTo(k);
if (cmp == 0 && k instanceof MemKey) {
- cmp = ((MemKey) k).mutationCount - mutationCount;
+ cmp = ((MemKey) k).kvCount - kvCount;
}
return cmp;
Added: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java?rev=1222766&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java (added)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java Fri Dec 23 17:53:12 2011
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.Value;
+
+/**
+ *
+ */
+public class MemValue extends Value {
+ int kvCount;
+ boolean merged = false;
+
+ /**
+ * @param value
+ * Value
+ * @param kv
+ * kv count
+ */
+ public MemValue(byte[] value, int kv) {
+ super(value);
+ this.kvCount = kv;
+ }
+
+ public MemValue() {
+ super();
+ this.kvCount = Integer.MAX_VALUE;
+ }
+
+ public MemValue(Value value, int kv) {
+ super(value);
+ this.kvCount = kv;
+ }
+
+ // Override
+ public void write(final DataOutput out) throws IOException {
+ if (!merged) {
+ byte[] combinedBytes = new byte[getSize() + 4];
+ System.arraycopy(value, 0, combinedBytes, 4, getSize());
+ combinedBytes[0] = (byte) (kvCount >>> 24);
+ combinedBytes[1] = (byte) (kvCount >>> 16);
+ combinedBytes[2] = (byte) (kvCount >>> 8);
+ combinedBytes[3] = (byte) (kvCount);
+ value = combinedBytes;
+ merged = true;
+ }
+ super.write(out);
+ }
+
+ public void set(final byte[] b) {
+ super.set(b);
+ merged = false;
+ }
+
+ public void copy(byte[] b) {
+ super.copy(b);
+ merged = false;
+ }
+
+ /**
+ * Takes a Value and will take out the embedded kvCount, and then return that value while replacing the Value with the original unembedded version
+ *
+ * @param v
+ * @return
+ */
+ public static int splitKVCount(Value v) {
+ if (v instanceof MemValue)
+ return ((MemValue) v).kvCount;
+
+ byte[] originalBytes = new byte[v.getSize() - 4];
+ byte[] combined = v.get();
+ System.arraycopy(combined, 4, originalBytes, 0, originalBytes.length);
+ v.set(originalBytes);
+ return (combined[0] << 24) + ((combined[1] & 0xFF) << 16) + ((combined[2] & 0xFF) << 8) + (combined[3] & 0xFF);
+ }
+}
Propchange: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Fri Dec 23 17:53:12 2011
@@ -66,7 +66,6 @@ import org.apache.accumulo.core.data.thr
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.file.map.MyMapFile;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil;
@@ -1548,7 +1547,7 @@ public class Tablet {
continue;
}
- if (!filename.startsWith(MyMapFile.EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) {
+ if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) {
log.error("unknown file in tablet" + path);
continue;
}
Copied: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java (from r1215244, incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java)
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java?p2=incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java&p1=incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java&r1=1215244&r2=1222766&rev=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java Fri Dec 23 17:53:12 2011
@@ -20,17 +20,17 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.map.MyMapFile;
-import org.apache.accumulo.core.file.map.MyMapFile.Writer;
-import org.apache.accumulo.core.file.map.MySequenceFile.CompressionType;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
-public class CreateRandomMapFile {
+public class CreateRandomRFile {
private static int num;
private static String file;
@@ -62,10 +62,10 @@ public class CreateRandomMapFile {
Arrays.sort(rands);
Configuration conf = CachedConfiguration.getInstance();
- Writer mfw;
+ FileSKVWriter mfw;
try {
FileSystem fs = FileSystem.get(conf);
- mfw = new MyMapFile.Writer(conf, fs, file, Key.class, Value.class, CompressionType.BLOCK);
+ mfw = new RFileOperations().openWriter(file, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
} catch (IOException e) {
throw new RuntimeException(e);
}
Propchange: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java Fri Dec 23 17:53:12 2011
@@ -23,18 +23,20 @@ import java.util.Comparator;
import java.util.List;
import java.util.Random;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.file.map.MyMapFile;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.map.MySequenceFile;
import org.apache.accumulo.core.file.map.MySequenceFile.Reader;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
class MultipleIndexIterator2 {
@@ -83,12 +85,12 @@ class MultipleIndexIterator2 {
return currentMin >= 0;
}
- WritableComparable<?> next() {
+ Key next() {
if (currentMin < 0) {
throw new RuntimeException("There is no next");
}
- WritableComparable<?> ret = nextKey[currentMin];
+ Key ret = nextKey[currentMin];
try {
nextKey[currentMin] = (Key) readers[currentMin].getKeyClass().newInstance();
@@ -214,16 +216,16 @@ public class MidPointPerfTest2 {
start = end;
- Path outFile = new Path(String.format("%s/index_%04d", newDir, count++));
- outFiles.add(outFile);
+ String outFile = String.format("%s/index_%04d", newDir, count++);
+ outFiles.add(new Path(outFile));
long t1 = System.currentTimeMillis();
- MySequenceFile.Writer writer = MySequenceFile.createWriter(fs, conf, outFile, Key.class, LongWritable.class, MySequenceFile.CompressionType.BLOCK);
+ FileSKVWriter writer = new RFileOperations().openWriter(outFile, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
MultipleIndexIterator2 mii = new MultipleIndexIterator2(conf, fs, inFiles);
while (mii.hasNext()) {
- writer.append(mii.next(), new LongWritable(0));
+ writer.append(mii.next(), new Value(new byte[0]));
}
mii.close();
@@ -254,7 +256,7 @@ public class MidPointPerfTest2 {
FileSystem fs = FileSystem.get(conf);
for (int i = 0; i < numFiles; i++) {
- String newDir = String.format("%s/" + MyMapFile.EXTENSION + "_%06d", dir, i);
+ String newDir = String.format("%s/" + RFile.EXTENSION + "_%06d", dir, i);
fs.mkdirs(new Path(newDir));
List<Key> keys = new ArrayList<Key>();
@@ -267,13 +269,12 @@ public class MidPointPerfTest2 {
Collections.sort(keys, new CompareKeys());
- MySequenceFile.Writer writer = MySequenceFile.createWriter(fs, conf, new Path(newDir + "/index"), Key.class, LongWritable.class,
- MySequenceFile.CompressionType.BLOCK);
+ FileSKVWriter writer = new RFileOperations().openWriter(newDir, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
System.out.println(new Path(newDir + "/index"));
for (Key key : keys) {
- writer.append(key, new LongWritable(0));
+ writer.append(key, new Value(new byte[0]));
}
writer.close();
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java Fri Dec 23 17:53:12 2011
@@ -38,7 +38,6 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.file.map.MyMapFile;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
@@ -295,7 +294,7 @@ public class TestIngest {
if (ingestArgs.outputToMapFile) {
Configuration conf = CachedConfiguration.getInstance();
FileSystem fs = FileSystem.get(conf);
- writer = FileOperations.getInstance().openWriter(ingestArgs.outputFile + "." + MyMapFile.EXTENSION, fs, conf,
+ writer = FileOperations.getInstance().openWriter(ingestArgs.outputFile + "." + RFile.EXTENSION, fs, conf,
AccumuloConfiguration.getDefaultConfiguration());
writer.startDefaultLocalityGroup();
} else if (ingestArgs.outputToRFile) {
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java Fri Dec 23 17:53:12 2011
@@ -65,7 +65,7 @@ public class BadIteratorMincTest extends
UtilWaitThread.sleep(1000);
// minc should fail, so there should be no files
- checkMapFiles("foo", 1, 1, 0, 0);
+ checkRFiles("foo", 1, 1, 0, 0);
// try to scan table
Scanner scanner = getConnector().createScanner("foo", Constants.NO_AUTHS);
@@ -85,7 +85,7 @@ public class BadIteratorMincTest extends
UtilWaitThread.sleep(5000);
// minc should complete
- checkMapFiles("foo", 1, 1, 1, 1);
+ checkRFiles("foo", 1, 1, 1, 1);
count = 0;
for (@SuppressWarnings("unused")
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java Fri Dec 23 17:53:12 2011
@@ -83,10 +83,10 @@ public class BloomFilterTest extends Fun
getConnector().tableOperations().flush("bt4", null, null, true);
// ensure minor compactions are finished
- super.checkMapFiles("bt1", 1, 1, 1, 1);
- super.checkMapFiles("bt2", 1, 1, 1, 1);
- super.checkMapFiles("bt3", 1, 1, 1, 1);
- super.checkMapFiles("bt4", 1, 1, 1, 1);
+ super.checkRFiles("bt1", 1, 1, 1, 1);
+ super.checkRFiles("bt2", 1, 1, 1, 1);
+ super.checkRFiles("bt3", 1, 1, 1, 1);
+ super.checkRFiles("bt4", 1, 1, 1, 1);
// these queries should only run quickly if bloom
// filters are working
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java Fri Dec 23 17:53:12 2011
@@ -29,7 +29,6 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.file.map.MyMapFile;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.trace.TraceFileSystem;
@@ -62,14 +61,14 @@ public class BulkFileTest extends Functi
fs.delete(new Path(dir), true);
- FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + MyMapFile.EXTENSION, fs, conf, ServerConfiguration.getSystemConfiguration());
+ FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf,
+ ServerConfiguration.getSystemConfiguration());
writer1.startDefaultLocalityGroup();
writeData(writer1, 0, 333);
writer1.close();
- fs.rename(new Path(dir + "/f1." + MyMapFile.EXTENSION), new Path(dir + "/f1"));
-
- FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + MyMapFile.EXTENSION, fs, conf, ServerConfiguration.getSystemConfiguration());
+ FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf,
+ ServerConfiguration.getSystemConfiguration());
writer2.startDefaultLocalityGroup();
writeData(writer2, 334, 999);
writer2.close();
@@ -81,7 +80,7 @@ public class BulkFileTest extends Functi
bulkImport(fs, "bulkFile", dir);
- checkMapFiles("bulkFile", 6, 6, 1, 1);
+ checkRFiles("bulkFile", 6, 6, 1, 1);
verifyData("bulkFile", 0, 1999);
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java Fri Dec 23 17:53:12 2011
@@ -68,7 +68,7 @@ public class BulkSplitOptimizationTest e
bulkImport(fs, TABLE_NAME, "/testmf");
checkSplits(TABLE_NAME, 0, 0);
- checkMapFiles(TABLE_NAME, 1, 1, 100, 100);
+ checkRFiles(TABLE_NAME, 1, 1, 100, 100);
// initiate splits
getConnector().tableOperations().setProperty(TABLE_NAME, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K");
@@ -85,6 +85,6 @@ public class BulkSplitOptimizationTest e
VerifyIngest.main(new String[] {"-timestamp", "1", "-size", "50", "-random", "56", "100000", "0", "1"});
// ensure each tablet does not have all map files
- checkMapFiles(TABLE_NAME, 50, 100, 1, 4);
+ checkRFiles(TABLE_NAME, 50, 100, 1, 4);
}
}
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java Fri Dec 23 17:53:12 2011
@@ -60,7 +60,7 @@ public class DeleteEverythingTest extend
getConnector().tableOperations().flush("de", null, null, true);
- checkMapFiles("de", 1, 1, 1, 1);
+ checkRFiles("de", 1, 1, 1, 1);
m = new Mutation(new Text("foo"));
m.putDelete(new Text("bar"), new Text("1910"));
@@ -84,7 +84,7 @@ public class DeleteEverythingTest extend
getConnector().tableOperations().setProperty("de", Property.TABLE_MAJC_RATIO.getKey(), "1.0");
UtilWaitThread.sleep(4000);
- checkMapFiles("de", 1, 1, 0, 0);
+ checkRFiles("de", 1, 1, 0, 0);
bw.close();
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java Fri Dec 23 17:53:12 2011
@@ -218,11 +218,11 @@ public abstract class FunctionalTest {
}
/**
- * A utility function that checks that each tablet has an expected number of map files.
+ * A utility function that checks that each tablet has an expected number of rfiles.
*
*/
- protected void checkMapFiles(String tableName, int minTablets, int maxTablets, int minMapFiles, int maxMapFiles) throws Exception {
+ protected void checkRFiles(String tableName, int minTablets, int maxTablets, int minRFiles, int maxRFiles) throws Exception {
Scanner scanner = getConnector().createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
String tableId = Tables.getNameToIdMap(getInstance()).get(tableName);
scanner.setRange(new Range(new Text(tableId + ";"), true, new Text(tableId + "<"), true));
@@ -251,7 +251,7 @@ public abstract class FunctionalTest {
Set<Entry<Text,Integer>> es = tabletFileCounts.entrySet();
for (Entry<Text,Integer> entry : es) {
- if (entry.getValue() > maxMapFiles || entry.getValue() < minMapFiles) {
+ if (entry.getValue() > maxRFiles || entry.getValue() < minRFiles) {
throw new Exception("tablet " + entry.getKey() + " has " + entry.getValue() + " map files");
}
}
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java Fri Dec 23 17:53:12 2011
@@ -66,7 +66,7 @@ public class MaxOpenTest extends Functio
TestIngest.main(new String[] {"-random", "" + i, "-timestamp", "" + i, "-size", "" + 50, "" + NUM_TO_INGEST, "0", "1"});
getConnector().tableOperations().flush("test_ingest", null, null, true);
- checkMapFiles("test_ingest", NUM_TABLETS, NUM_TABLETS, i + 1, i + 1);
+ checkRFiles("test_ingest", NUM_TABLETS, NUM_TABLETS, i + 1, i + 1);
}
List<Range> ranges = new ArrayList<Range>(NUM_TO_INGEST);
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java Fri Dec 23 17:53:12 2011
@@ -62,7 +62,7 @@ public class RowDeleteTest extends Funct
bw.flush();
getConnector().tableOperations().flush("rdel1", null, null, true);
- checkMapFiles("rdel1", 1, 1, 1, 1);
+ checkRFiles("rdel1", 1, 1, 1, 1);
int count = 0;
Scanner scanner = getConnector().createScanner("rdel1", Constants.NO_AUTHS);
@@ -81,7 +81,7 @@ public class RowDeleteTest extends Funct
// Wait for the files in HDFS to be older than the future compaction date
UtilWaitThread.sleep(2000);
- checkMapFiles("rdel1", 1, 1, 2, 2);
+ checkRFiles("rdel1", 1, 1, 2, 2);
count = 0;
scanner = getConnector().createScanner("rdel1", Constants.NO_AUTHS);
@@ -94,7 +94,7 @@ public class RowDeleteTest extends Funct
getConnector().tableOperations().compact("rdel1", null, null, false, true);
- checkMapFiles("rdel1", 1, 1, 0, 0);
+ checkRFiles("rdel1", 1, 1, 0, 0);
count = 0;
scanner = getConnector().createScanner("rdel1", Constants.NO_AUTHS);
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java Fri Dec 23 17:53:12 2011
@@ -34,7 +34,7 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.file.map.MyMapFile;
+import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
import org.apache.accumulo.core.zookeeper.ZooUtil;
@@ -48,9 +48,9 @@ import org.apache.accumulo.server.tablet
import org.apache.accumulo.server.util.MetadataTable;
import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.io.Text;
public class SplitRecoveryTest extends FunctionalTest {
@@ -128,7 +128,7 @@ public class SplitRecoveryTest extends F
String tdir = "/dir_" + i;
MetadataTable.addTablet(extent, tdir, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID, zl);
SortedMap<String,DataFileValue> mapFiles = new TreeMap<String,DataFileValue>();
- mapFiles.put(tdir + "/" + MyMapFile.EXTENSION + "_000_000", new DataFileValue(1000017 + i, 10000 + i));
+ mapFiles.put(tdir + "/" + RFile.EXTENSION + "_000_000", new DataFileValue(1000017 + i, 10000 + i));
if (i == extentToSplit) {
splitMapFiles = mapFiles;
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java Fri Dec 23 17:53:12 2011
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Range;
@@ -217,7 +218,7 @@ public class MapFilePerformanceTest {
System.out.println("Thread " + Thread.currentThread().getName() + " creating map files blocksize = " + blocksize + " num = " + num);
String[] filenames;
try {
- filenames = createMapFiles(args[0], args[1] + "/" + MyMapFile.EXTENSION + "_" + blocksize, blocksize, num);
+ filenames = createMapFiles(args[0], args[1] + "/" + Constants.MAPFILE_EXTENSION + "_" + blocksize, blocksize, num);
synchronized (tests) {
Map<Integer,String[]> map = tests.get(num);
Modified: incubator/accumulo/trunk/test/system/auto/simple/compaction.py
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/test/system/auto/simple/compaction.py?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/test/system/auto/simple/compaction.py (original)
+++ incubator/accumulo/trunk/test/system/auto/simple/compaction.py Fri Dec 23 17:53:12 2011
@@ -44,7 +44,7 @@ class CompactionTest(SimpleBulkTest):
handle = self.runClassOn(
self.masterHost(),
'org.apache.accumulo.server.test.CreateMapFiles',
- "testmf 4 0 500000 59".split())
+ "testrf 4 0 500000 59".split())
out, err = handle.communicate()
self.assert_(handle.returncode == 0)
@@ -52,8 +52,8 @@ class CompactionTest(SimpleBulkTest):
# initialize the database
self.createTable('test_ingest')
- self.execute(self.masterHost(), 'hadoop dfs -rmr /testmf'.split())
- self.execute(self.masterHost(), 'hadoop dfs -rmr /testmfFail'.split())
+ self.execute(self.masterHost(), 'hadoop dfs -rmr /testrf'.split())
+ self.execute(self.masterHost(), 'hadoop dfs -rmr /testrfFail'.split())
# insert some data
self.createMapFiles(self.masterHost())