You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2012/10/16 00:17:26 UTC
svn commit: r1398539 [3/4] - in /accumulo/branches/ACCUMULO-259: ./
assemble/ assemble/platform/ assemble/platform/debian/
assemble/platform/debian/init.d/ conf/examples/1GB/native-standalone/
conf/examples/1GB/standalone/ conf/examples/2GB/native-stan...
Modified: accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java Mon Oct 15 22:17:22 2012
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTru
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -107,6 +108,24 @@ public class AccumuloInputFormatTest {
String iterators = conf.get("AccumuloInputFormat.iterators");
assertEquals(new String(Base64.encodeBase64(baos.toByteArray())), iterators);
}
+
+ static abstract class GetRanges<K, V> extends InputFormatBase<K,V> {
+ public static List<Range> getRanges(Configuration conf) throws IOException {
+ return InputFormatBase.getRanges(conf);
+ }
+ };
+
+ @Test
+ public void testSetRanges() throws IOException {
+ JobContext job = ContextFactory.createJobContext();
+ List<Range> ranges = new ArrayList<Range>();
+ for (int i = 0; i < 100000; i++) {
+ ranges.add(new Range(new Text(String.format("%05x", i))));
+ }
+ AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
+ List<Range> ranges2 = GetRanges.getRanges(job.getConfiguration());
+ assertEquals(ranges, ranges2);
+ }
@Test
public void testAddIterator() {
Modified: accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mock/MockConnectorTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mock/MockConnectorTest.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mock/MockConnectorTest.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mock/MockConnectorTest.java Mon Oct 15 22:17:22 2012
@@ -25,12 +25,17 @@ import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Random;
+import junit.framework.Assert;
+
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.Scanner;
@@ -53,7 +58,7 @@ public class MockConnectorTest {
@Test
public void testSunnyDay() throws Exception {
- Connector c = new MockConnector("root");
+ Connector c = new MockConnector("root", new MockInstance());
c.tableOperations().create("test");
BatchWriter bw = c.createBatchWriter("test", new BatchWriterConfig());
for (int i = 0; i < 100; i++) {
@@ -79,7 +84,7 @@ public class MockConnectorTest {
@Test
public void testChangeAuths() throws Exception {
- Connector c = new MockConnector("root");
+ Connector c = new MockConnector("root", new MockInstance());
c.securityOperations().createUser("greg", new byte[] {}, new Authorizations("A", "B", "C"));
assertTrue(c.securityOperations().getUserAuthorizations("greg").contains("A".getBytes()));
c.securityOperations().changeUserAuthorizations("greg", new Authorizations("X", "Y", "Z"));
@@ -120,7 +125,7 @@ public class MockConnectorTest {
@Test
public void testDelete() throws Exception {
- Connector c = new MockConnector("root");
+ Connector c = new MockConnector("root", new MockInstance());
c.tableOperations().create("test");
BatchWriter bw = c.createBatchWriter("test", new BatchWriterConfig());
@@ -159,7 +164,7 @@ public class MockConnectorTest {
@Test
public void testDeletewithBatchDeleter() throws Exception {
- Connector c = new MockConnector("root");
+ Connector c = new MockConnector("root", new MockInstance());
// make sure we are using a clean table
if (c.tableOperations().exists("test"))
@@ -230,7 +235,7 @@ public class MockConnectorTest {
@Test
public void testCMod() throws Exception {
// test writing to a table that the is being scanned
- Connector c = new MockConnector("root");
+ Connector c = new MockConnector("root", new MockInstance());
c.tableOperations().create("test");
BatchWriter bw = c.createBatchWriter("test", new BatchWriterConfig());
@@ -281,7 +286,7 @@ public class MockConnectorTest {
@Test
public void testMockMultiTableBatchWriter() throws Exception {
- Connector c = new MockConnector("root");
+ Connector c = new MockConnector("root", new MockInstance());
c.tableOperations().create("a");
c.tableOperations().create("b");
MultiTableBatchWriter bw = c.createMultiTableBatchWriter(new BatchWriterConfig());
@@ -313,7 +318,7 @@ public class MockConnectorTest {
@Test
public void testUpdate() throws Exception {
- Connector c = new MockConnector("root");
+ Connector c = new MockConnector("root", new MockInstance());
c.tableOperations().create("test");
BatchWriter bw = c.createBatchWriter("test", new BatchWriterConfig());
@@ -332,5 +337,14 @@ public class MockConnectorTest {
assertEquals("9", entry.getValue().toString());
}
+
+ @Test
+ public void testMockConnectorReturnsCorrectInstance() throws AccumuloException,
+ AccumuloSecurityException{
+ String name = "an-interesting-instance-name";
+ Instance mockInstance = new MockInstance(name);
+ Assert.assertEquals(mockInstance, mockInstance.getConnector("foo", "bar").getInstance());
+ Assert.assertEquals(name, mockInstance.getConnector("foo","bar").getInstance().getInstanceName());
+ }
}
Modified: accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java Mon Oct 15 22:17:22 2012
@@ -16,19 +16,114 @@
*/
package org.apache.accumulo.core.client.mock;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.admin.TimeType;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test;
public class MockTableOperationsTest {
+
+ @Test
+ public void testCreateUseVersions() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException {
+ Instance instance = new MockInstance("topstest");
+ Connector conn = instance.getConnector("user", "pass");
+ String t = "tableName1";
+
+ {
+ conn.tableOperations().create(t, false, TimeType.LOGICAL);
+
+ writeVersionable(conn, t, 3);
+ assertVersionable(conn, t, 3);
+
+ IteratorSetting settings = new IteratorSetting(20,VersioningIterator.class);
+ conn.tableOperations().attachIterator(t, settings);
+
+ assertVersionable(conn, t, 1);
+
+ conn.tableOperations().delete(t);
+ }
+
+ {
+ conn.tableOperations().create(t, true, TimeType.MILLIS);
+
+ try {
+ IteratorSetting settings = new IteratorSetting(20,VersioningIterator.class);
+ conn.tableOperations().attachIterator(t, settings);
+ Assert.fail();
+ }
+ catch (IllegalArgumentException ex) {}
+
+ writeVersionable(conn, t, 3);
+ assertVersionable(conn, t, 1);
+
+ conn.tableOperations().delete(t);
+ }
+ }
+
+ protected void writeVersionable(Connector c, String tableName, int size) throws TableNotFoundException, MutationsRejectedException {
+ for (int i=0; i < size; i++) {
+ BatchWriter w = c.createBatchWriter(tableName, 100, 100, 1);
+ Mutation m = new Mutation("row1");
+ m.put("cf", "cq", String.valueOf(i));
+ w.addMutation(m);
+ w.close();
+ }
+ }
+
+ protected void assertVersionable(Connector c, String tableName, int size) throws TableNotFoundException {
+ BatchScanner s = c.createBatchScanner(tableName, Constants.NO_AUTHS, 1);
+ s.setRanges(Collections.singleton(Range.exact("row1", "cf", "cq")));
+ int count = 0;
+ for (Map.Entry<Key, Value> e: s) {
+ Assert.assertEquals("row1", e.getKey().getRow().toString());
+ Assert.assertEquals("cf", e.getKey().getColumnFamily().toString());
+ Assert.assertEquals("cq", e.getKey().getColumnQualifier().toString());
+ count++;
+
+ }
+ Assert.assertEquals(size, count);
+ s.close();
+ }
+
@Test
public void testTableNotFound() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException {
Instance instance = new MockInstance("topstest");
@@ -80,4 +175,92 @@ public class MockTableOperationsTest {
Assert.fail();
} catch (TableExistsException e) {}
}
+
+ private static class ImportTestFilesAndData {
+ Path importPath;
+ Path failurePath;
+ List<Pair<Key, Value>> keyVals;
+ }
+
+ @Test
+ public void testImport() throws Throwable {
+ ImportTestFilesAndData dataAndFiles = prepareTestFiles();
+ Instance instance = new MockInstance("foo");
+ Connector connector = instance.getConnector(new AuthInfo("user", ByteBuffer
+ .wrap(new byte[0]), "foo"));
+ TableOperations tableOperations = connector.tableOperations();
+ tableOperations.create("a_table");
+ tableOperations.importDirectory("a_table",
+ dataAndFiles.importPath.toString(),
+ dataAndFiles.failurePath.toString(), false);
+ Scanner scanner = connector.createScanner("a_table", new Authorizations());
+ Iterator<Entry<Key, Value>> iterator = scanner.iterator();
+ for (int i = 0; i < 5; i++) {
+ Assert.assertTrue(iterator.hasNext());
+ Entry<Key, Value> kv = iterator.next();
+ Pair<Key, Value> expected = dataAndFiles.keyVals.get(i);
+ Assert.assertEquals(expected.getFirst(), kv.getKey());
+ Assert.assertEquals(expected.getSecond(), kv.getValue());
+ }
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ private ImportTestFilesAndData prepareTestFiles() throws Throwable {
+ Configuration defaultConf = new Configuration();
+ Path tempFile = new Path("target/accumulo-test/import/sample.rf");
+ Path failures = new Path("target/accumulo-test/failures/");
+ FileSystem fs = FileSystem.get(new URI("file:///"), defaultConf);
+ fs.deleteOnExit(tempFile);
+ fs.deleteOnExit(failures);
+ fs.delete(failures, true);
+ fs.delete(tempFile, true);
+ fs.mkdirs(failures);
+ fs.mkdirs(tempFile.getParent());
+ FileSKVWriter writer = FileOperations.getInstance().openWriter(
+ tempFile.toString(), fs, defaultConf,
+ AccumuloConfiguration.getDefaultConfiguration());
+ writer.startDefaultLocalityGroup();
+ List<Pair<Key, Value>> keyVals = new ArrayList<Pair<Key, Value>>();
+ for (int i = 0; i < 5; i++) {
+ keyVals.add(new Pair<Key, Value>(new Key("a" + i, "b" + i, "c" + i,
+ new ColumnVisibility(""), 1000l + i), new Value(Integer.toString(i)
+ .getBytes())));
+ }
+ for (Pair<Key, Value> keyVal : keyVals) {
+ writer.append(keyVal.getFirst(), keyVal.getSecond());
+ }
+ writer.close();
+ ImportTestFilesAndData files = new ImportTestFilesAndData();
+ files.failurePath = failures;
+ files.importPath = tempFile.getParent();
+ files.keyVals = keyVals;
+ return files;
+ }
+
+ @Test(expected = TableNotFoundException.class)
+ public void testFailsWithNoTable() throws Throwable {
+ Instance instance = new MockInstance("foo");
+ Connector connector = instance.getConnector(new AuthInfo("user", ByteBuffer
+ .wrap(new byte[0]), "foo"));
+ TableOperations tableOperations = connector.tableOperations();
+ ImportTestFilesAndData testFiles = prepareTestFiles();
+ tableOperations.importDirectory("doesnt_exist_table",
+ testFiles.importPath.toString(), testFiles.failurePath.toString(),
+ false);
+ }
+
+ @Test(expected = IOException.class)
+ public void testFailsWithNonEmptyFailureDirectory() throws Throwable {
+ Instance instance = new MockInstance("foo");
+ Connector connector = instance.getConnector(new AuthInfo("user", ByteBuffer
+ .wrap(new byte[0]), "foo"));
+ TableOperations tableOperations = connector.tableOperations();
+ ImportTestFilesAndData testFiles = prepareTestFiles();
+ FileSystem fs = testFiles.failurePath.getFileSystem(new Configuration());
+ fs.open(testFiles.failurePath.suffix("/something")).close();
+ tableOperations.importDirectory("doesnt_exist_table",
+ testFiles.importPath.toString(), testFiles.failurePath.toString(),
+ false);
+ }
+
}
Modified: accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java Mon Oct 15 22:17:22 2012
@@ -63,8 +63,6 @@ public class MutationTest extends TestCa
assertEquals("cq1", new String(cu.getColumnQualifier()));
assertEquals("", new String(cu.getColumnVisibility()));
assertFalse(cu.hasTimestamp());
- cu.setSystemTimestamp(42l);
- assertEquals(42l, cu.getTimestamp());
cu = updates.get(1);
@@ -87,7 +85,6 @@ public class MutationTest extends TestCa
assertEquals("cq1", new String(cu.getColumnQualifier()));
assertEquals("", new String(cu.getColumnVisibility()));
assertFalse(cu.hasTimestamp());
- assertEquals(42l, cu.getTimestamp());
cu = updates.get(1);
@@ -345,6 +342,76 @@ public class MutationTest extends TestCa
assertEquals(2, m2.size());
assertEquals(m2.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
assertEquals(m2.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
+ }
+
+ Mutation convert(OldMutation old) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ old.write(dos);
+ dos.close();
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+ DataInputStream dis = new DataInputStream(bis);
+ Mutation m = new Mutation();
+ m.readFields(dis);
+ dis.close();
+ return m;
+ }
+
+
+ public void testNewSerialization() throws Exception {
+ // write an old mutation
+ OldMutation m2 = new OldMutation("r1");
+ m2.put("cf1", "cq1", "v1");
+ m2.put("cf2", "cq2", new ColumnVisibility("cv2"), "v2");
+ m2.putDelete("cf3", "cq3");
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ m2.write(dos);
+ dos.close();
+ long oldSize = dos.size();
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+ DataInputStream dis = new DataInputStream(bis);
+ m2.readFields(dis);
+ dis.close();
+
+ // check it
+ assertEquals("r1", new String(m2.getRow()));
+ assertEquals(3, m2.getUpdates().size());
+ assertEquals(3, m2.size());
+ assertEquals(m2.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
+ assertEquals(m2.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
+ assertEquals(m2.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, "");
+
+ Mutation m1 = convert(m2);
+
+ assertEquals("r1", new String(m1.getRow()));
+ assertEquals(3, m1.getUpdates().size());
+ assertEquals(3, m1.size());
+ assertEquals(m1.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
+ assertEquals(m1.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
+ assertEquals(m2.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, "");
+
+ Text exampleRow = new Text(" 123456789 123456789 123456789 123456789 123456789");
+ int exampleLen = exampleRow.getLength();
+ m1 = new Mutation(exampleRow);
+ m1.put("", "", "");
+
+ bos = new ByteArrayOutputStream();
+ dos = new DataOutputStream(bos);
+ m1.write(dos);
+ dos.close();
+ long newSize = dos.size();
+ assertTrue(newSize < oldSize);
+ System.out.println(String.format("%d %d %.2f%%", newSize - exampleLen, oldSize - exampleLen, (newSize-exampleLen) * 100. / (oldSize - exampleLen)));
+ byte[] ba = bos.toByteArray();
+ for (int i = 0; i < bos.size(); i += 4) {
+ for (int j = i; j < bos.size() && j < i + 4; j++) {
+ System.out.append(String.format("%02x", ba[j]));
+ }
+ System.out.append(" ");
+ }
+ System.out.println();
}
+
}
Modified: accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/user/RegExFilterTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/user/RegExFilterTest.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/user/RegExFilterTest.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/user/RegExFilterTest.java Mon Oct 15 22:17:22 2012
@@ -67,6 +67,21 @@ public class RegExFilterTest extends Tes
assertTrue(rei.getTopKey().equals(k3));
rei.next();
assertFalse(rei.hasTop());
+
+ // -----------------------------------------------------
+ // Test substring regex
+ is.clearOptions();
+
+ RegExFilter.setRegexs(is, null, null, null, "amst", false, true); // Should only match hamster
+
+ rei.validateOptions(is.getOptions());
+ rei.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment());
+ rei.seek(new Range(), EMPTY_COL_FAMS, false);
+
+ assertTrue(rei.hasTop());
+ assertTrue(rei.getTopKey().equals(k3));
+ rei.next();
+ assertFalse(rei.hasTop());
// -----------------------------------------------------
is.clearOptions();
Modified: accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java (original)
+++ accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java Mon Oct 15 22:17:22 2012
@@ -47,6 +47,7 @@ public class ZooStore<T> implements TSto
private String path;
private IZooReaderWriter zk;
+ private String lastReserved = "";
private Set<Long> reserved;
private Map<Long,Long> defered;
private SecureRandom idgenerator;
@@ -123,20 +124,33 @@ public class ZooStore<T> implements TSto
events = statusChangeEvents;
}
- List<String> txdirs = zk.getChildren(path);
+ List<String> txdirs = new ArrayList<String>(zk.getChildren(path));
+ Collections.sort(txdirs);
+
+ synchronized (this) {
+ if (txdirs.size() > 0 && txdirs.get(txdirs.size() - 1).compareTo(lastReserved) <= 0)
+ lastReserved = "";
+ }
for (String txdir : txdirs) {
long tid = parseTid(txdir);
synchronized (this) {
+ // this check makes reserve pick up where it left off, so that it cycles through all as it is repeatedly called.... failing to do so can lead to
+ // starvation where fate ops that sort higher and hold a lock are never reserved.
+ if (txdir.compareTo(lastReserved) <= 0)
+ continue;
+
if (defered.containsKey(tid)) {
if (defered.get(tid) < System.currentTimeMillis())
defered.remove(tid);
else
continue;
}
- if (!reserved.contains(tid))
+ if (!reserved.contains(tid)) {
reserved.add(tid);
+ lastReserved = txdir;
+ }
else
continue;
}
Propchange: accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Mon Oct 15 22:17:22 2012
@@ -0,0 +1,17 @@
+/accumulo/branches/1.3/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1309369,1328076,1330246,1330264,1330944,1349971,1354669
+/accumulo/branches/1.3/src/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1309369,1328076,1330246,1349971,1354669
+/accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1305403-1382577,1382613,1388120,1388629,1393868,1396065,1396572,1396616,1396758,1396772,1397048,1397113,1397117,1397176,1397189,1397383,1397700,1397921,1398286,1398308,1398359,1398393,1398399,1398438
+/accumulo/branches/1.4/src/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1305403-1356900,1358206,1363430,1364778,1365213,1382566,1382923,1388120,1396772,1397048,1397113,1397117,1397176,1397189,1397383,1397700,1397921,1398286,1398308,1398359,1398393,1398399,1398438
+/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/fate/ZooStore.java:1393868
+/accumulo/branches/ACCUMULO-672/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1357826,1357829,1357842,1357858,1358236,1359163
+/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1343822-1391624,1391755-1398536
+/accumulo/trunk/src/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1329425,1332224,1332278,1332347,1333047,1333070,1341000,1342373,1350779,1351691,1356400,1359721
+/incubator/accumulo/branches/1.3/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873,1245632
+/incubator/accumulo/branches/1.3/src/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873,1245632
+/incubator/accumulo/branches/1.3.5rc/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1209938
+/incubator/accumulo/branches/1.3.5rc/src/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1209938
+/incubator/accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1201902-1305402
+/incubator/accumulo/branches/1.4/src/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1201899-1305402
+/incubator/accumulo/branches/1.4.0rc/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1304025,1305326
+/incubator/accumulo/branches/1.4.0rc/src/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1304025,1305326
+/incubator/accumulo/trunk/src/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:1178656-1201898,1205476,1205570,1208726,1222413,1222719,1222725,1222733-1222734,1296160-1296495
Modified: accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java (original)
+++ accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java Mon Oct 15 22:17:22 2012
@@ -77,6 +77,8 @@ class ZooSession {
int sleepTime = 100;
ZooKeeper zooKeeper = null;
+ long startTime = System.currentTimeMillis();
+
while (tryAgain) {
try {
zooKeeper = new ZooKeeper(host, timeout, watcher);
@@ -89,6 +91,10 @@ class ZooSession {
} else
UtilWaitThread.sleep(TIME_BETWEEN_CONNECT_CHECKS_MS);
}
+
+ if (System.currentTimeMillis() - startTime > 2 * timeout)
+ throw new RuntimeException("Failed to connect to zookeeper (" + host + ") within 2x zookeeper timeout period " + timeout);
+
} catch (UnknownHostException uhe) {
// do not expect to recover from this
log.warn(uhe.getClass().getName() + " : " + uhe.getMessage());
Propchange: accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Mon Oct 15 22:17:22 2012
@@ -0,0 +1,17 @@
+/accumulo/branches/1.3/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1309369,1328076,1330246,1330264,1330944,1349971,1354669
+/accumulo/branches/1.3/src/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1309369,1328076,1330246,1349971,1354669
+/accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1305403-1382577,1382613,1388120,1388629,1393868,1396065,1396572,1396616,1396758,1396772,1397048,1397113,1397117,1397176,1397189,1397383,1397700,1397921,1398286,1398308,1398359,1398393,1398399,1398438
+/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java:1398399
+/accumulo/branches/1.4/src/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1305403-1356900,1358206,1363430,1364778,1365213,1382566,1382923,1388120,1396772,1397048,1397113,1397117,1397176,1397189,1397383,1397700,1397921,1398286,1398308,1398359,1398393,1398399,1398438
+/accumulo/branches/ACCUMULO-672/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1357826,1357829,1357842,1357858,1358236,1359163
+/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1343822-1391624,1391755-1398536
+/accumulo/trunk/src/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1329425,1332224,1332278,1332347,1333047,1333070,1341000,1342373,1350779,1351691,1356400,1359721
+/incubator/accumulo/branches/1.3/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873,1245632
+/incubator/accumulo/branches/1.3/src/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873,1245632
+/incubator/accumulo/branches/1.3.5rc/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1209938
+/incubator/accumulo/branches/1.3.5rc/src/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1209938
+/incubator/accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1201902-1305402
+/incubator/accumulo/branches/1.4/src/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1201899-1305402
+/incubator/accumulo/branches/1.4.0rc/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1304025,1305326
+/incubator/accumulo/branches/1.4.0rc/src/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1304025,1305326
+/incubator/accumulo/trunk/src/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:1178656-1201898,1205476,1205570,1208726,1222413,1222719,1222725,1222733-1222734,1296160-1296495
Modified: accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java (original)
+++ accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java Mon Oct 15 22:17:22 2012
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
@@ -237,20 +238,26 @@ public class ZooUtil {
public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
- List<String> children = zk.getChildren(lid.path, false);
-
- if (children.size() == 0) {
- return false;
+ while (true) {
+ try {
+ List<String> children = zk.getChildren(lid.path, false);
+
+ if (children.size() == 0) {
+ return false;
+ }
+
+ Collections.sort(children);
+
+ String lockNode = children.get(0);
+ if (!lid.node.equals(lockNode))
+ return false;
+
+ Stat stat = zk.exists(lid.path + "/" + lid.node, false);
+ return stat != null && stat.getEphemeralOwner() == lid.eid;
+ } catch (KeeperException.ConnectionLossException ex) {
+ UtilWaitThread.sleep(1000);
+ }
}
-
- Collections.sort(children);
-
- String lockNode = children.get(0);
- if (!lid.node.equals(lockNode))
- return false;
-
- Stat stat = zk.exists(lid.path + "/" + lid.node, false);
- return stat != null && stat.getEphemeralOwner() == lid.eid;
}
}
Modified: accumulo/branches/ACCUMULO-259/pom.xml
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/pom.xml?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/pom.xml (original)
+++ accumulo/branches/ACCUMULO-259/pom.xml Mon Oct 15 22:17:22 2012
@@ -195,6 +195,16 @@
<pluginManagement>
<plugins>
<plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>cobertura-maven-plugin</artifactId>
+ <version>2.5.2</version>
+ <configuration>
+ <formats>
+ <format>xml</format>
+ </formats>
+ </configuration>
+ </plugin>
+ <plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
@@ -367,7 +377,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>cobertura-maven-plugin</artifactId>
- <version>2.5.1</version>
+ <version>2.5.2</version>
<configuration>
<formats>
<format>xml</format>
Propchange: accumulo/branches/ACCUMULO-259/server/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.4/server:r1393868,1396065,1396572,1396616,1396758,1396772,1397048,1397113,1397117,1397176,1397189,1397383,1397700,1397921,1398286,1398308,1398359,1398393,1398399,1398438
Merged /accumulo/branches/1.4/src/server:r1393868,1396772,1397048,1397113,1397117,1397176,1397189,1397383,1397700,1397921,1398286,1398308,1398359,1398393,1398399,1398438
Merged /accumulo/trunk/server:r1391755-1398536
Modified: accumulo/branches/ACCUMULO-259/server/src/main/c++/Makefile
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/c%2B%2B/Makefile?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/c++/Makefile (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/c++/Makefile Mon Oct 15 22:17:22 2012
@@ -17,7 +17,11 @@
all: nm ml
nm:
- cd nativeMap ; make
+ cd nativeMap ; $(MAKE)
ml:
- cd mlock ; make
+ cd mlock ; $(MAKE)
+
+clean:
+ cd nativeMap ; $(MAKE) $@
+ cd mlock ; $(MAKE) $@
Modified: accumulo/branches/ACCUMULO-259/server/src/main/c++/mlock/Makefile
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/c%2B%2B/mlock/Makefile?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/c++/mlock/Makefile (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/c++/mlock/Makefile Mon Oct 15 22:17:22 2012
@@ -19,13 +19,33 @@ HDRS=$(wildcard *.h) org_apache_accumulo
CXX=g++
ifeq ($(shell uname),Linux)
-LIBS:= lib$(LIB)-Linux-amd64-64.so lib$(LIB)-Linux-i386-32.so
-CXXFLAGS=-g -fPIC -shared -O2 -fno-omit-frame-pointer -fno-strict-aliasing -Wall -I$(JAVA_HOME)/include/linux -I$(JAVA_HOME)/include
+ LIBS_32 := lib$(LIB)-Linux-i386-32.so
+ LIBS_64 := lib$(LIB)-Linux-amd64-64.so
+
+ ifneq ($(DARCH),)
+ ifeq ($(DARCH),64)
+ LIBS := $(LIBS_64)
+ endif
+ ifeq ($(DARCH),32)
+ LIBS := $(LIBS_32)
+ endif
+ ifeq ($(DARCH),both)
+ LIBS := $(LIBS_64)$(LIBS_32)
+ endif
+ ifeq ($(LIBS),)
+ LIBS := $(LIBS_64)$(LIBS_32)
+ endif
+ else
+ DARCH := $(shell getconf LONG_BIT)
+ LIBS := $(LIBS_$(DARCH))
+ endif
+
+ CXXFLAGS=-g -fPIC -shared -O2 -fno-omit-frame-pointer -fno-strict-aliasing -Wall -I$(JAVA_HOME)/include/linux -I$(JAVA_HOME)/include
endif
ifeq ($(shell uname),Darwin)
-LIBS:= lib$(LIB)-Mac_OS_X-x86_64-64.jnilib
-CXXFLAGS=-m64 -dynamiclib -O3 -I/System/Library/Frameworks/JavaVM.framework/Headers
+ LIBS:= lib$(LIB)-Mac_OS_X-x86_64-64.jnilib
+ CXXFLAGS=-m64 -dynamiclib -O3 -I/System/Library/Frameworks/JavaVM.framework/Headers
endif
INSTALL_DIR=../../../../../lib/native/mlock
Modified: accumulo/branches/ACCUMULO-259/server/src/main/c++/nativeMap/Makefile
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/c%2B%2B/nativeMap/Makefile?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/c++/nativeMap/Makefile (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/c++/nativeMap/Makefile Mon Oct 15 22:17:22 2012
@@ -18,13 +18,33 @@ HDRS=$(wildcard *.h) org_apache_accumulo
CXX=g++
ifeq ($(shell uname),Linux)
-LIBS:= libNativeMap-Linux-amd64-64.so libNativeMap-Linux-i386-32.so
-CXXFLAGS=-g -fPIC -shared -O2 -fno-omit-frame-pointer -fno-strict-aliasing -Wall -I$(JAVA_HOME)/include/linux -I$(JAVA_HOME)/include
+ LIBS_32 := libNativeMap-Linux-i386-32.so
+ LIBS_64 := libNativeMap-Linux-amd64-64.so
+
+ ifneq ($(DARCH),)
+ ifeq ($(DARCH),64)
+ LIBS := $(LIBS_64)
+ endif
+ ifeq ($(DARCH),32)
+ LIBS := $(LIBS_32)
+ endif
+ ifeq ($(DARCH),both)
+ LIBS := $(LIBS_64)$(LIBS_32)
+ endif
+ ifeq ($(LIBS),)
+ LIBS := $(LIBS_64)$(LIBS_32)
+ endif
+ else
+ DARCH := $(shell getconf LONG_BIT)
+ LIBS := $(LIBS_$(DARCH))
+ endif
+
+ CXXFLAGS=-g -fPIC -shared -O2 -fno-omit-frame-pointer -fno-strict-aliasing -Wall -I$(JAVA_HOME)/include/linux -I$(JAVA_HOME)/include
endif
ifeq ($(shell uname),Darwin)
-LIBS:= libNativeMap-Mac_OS_X-x86_64-64.jnilib
-CXXFLAGS=-m64 -dynamiclib -O3 -I/System/Library/Frameworks/JavaVM.framework/Headers
+ LIBS:= libNativeMap-Mac_OS_X-x86_64-64.jnilib
+ CXXFLAGS=-m64 -dynamiclib -O3 -I/System/Library/Frameworks/JavaVM.framework/Headers
endif
INSTALL_DIR=../../../../../lib/native/map
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/Accumulo.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/Accumulo.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/Accumulo.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/Accumulo.java Mon Oct 15 22:17:22 2012
@@ -128,7 +128,7 @@ public class Accumulo {
sortedProps.put(entry.getKey(), entry.getValue());
for (Entry<String,String> entry : sortedProps.entrySet()) {
- if (entry.getKey().toLowerCase().contains("password"))
+ if (entry.getKey().toLowerCase().contains("password") || entry.getKey().toLowerCase().contains("secret"))
log.info(entry.getKey() + " = <hidden>");
else
log.info(entry.getKey() + " = " + entry.getValue());
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/logger/LogFileValue.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/logger/LogFileValue.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/logger/LogFileValue.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/logger/LogFileValue.java Mon Oct 15 22:17:22 2012
@@ -22,32 +22,37 @@ package org.apache.accumulo.server.logge
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.server.data.ServerMutation;
import org.apache.hadoop.io.Writable;
public class LogFileValue implements Writable {
- private static final Mutation[] empty = new Mutation[0];
+ private static final List<Mutation> empty = Collections.emptyList();
- public Mutation[] mutations = empty;
+ public List<Mutation> mutations = empty;
@Override
public void readFields(DataInput in) throws IOException {
int count = in.readInt();
- mutations = new Mutation[count];
+ mutations = new ArrayList<Mutation>(count);
for (int i = 0; i < count; i++) {
- mutations[i] = new Mutation();
- mutations[i].readFields(in);
+ ServerMutation mutation = new ServerMutation();
+ mutation.readFields(in);
+ mutations.add(mutation);
}
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(mutations.length);
- for (int i = 0; i < mutations.length; i++) {
- mutations[i].write(out);
+ out.writeInt(mutations.size());
+ for (Mutation m : mutations) {
+ m.write(out);
}
}
@@ -63,16 +68,16 @@ public class LogFileValue implements Wri
}
public static String format(LogFileValue lfv, int maxMutations) {
- if (lfv.mutations.length == 0)
+ if (lfv.mutations.size() == 0)
return "";
StringBuilder builder = new StringBuilder();
- builder.append(lfv.mutations.length + " mutations:\n");
- for (int i = 0; i < lfv.mutations.length; i++) {
- if (i >= maxMutations) {
+ builder.append(lfv.mutations.size() + " mutations:\n");
+ int i = 0;
+ for (Mutation m : lfv.mutations) {
+ if (i++ >= maxMutations) {
builder.append("...");
break;
}
- Mutation m = lfv.mutations[i];
builder.append(" " + new String(m.getRow()) + "\n");
for (ColumnUpdate update : m.getUpdates()) {
String value = new String(update.getValue());
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java Mon Oct 15 22:17:22 2012
@@ -50,6 +50,12 @@ public class DefaultLoadBalancer extends
tableToBalance = table;
}
+ List<TServerInstance> randomize(Set<TServerInstance> locations) {
+ List<TServerInstance> result = new ArrayList<TServerInstance>(locations);
+ Collections.shuffle(result);
+ return result;
+ }
+
public TServerInstance getAssignment(SortedMap<TServerInstance,TabletServerStatus> locations, KeyExtent extent, TServerInstance last) {
if (locations.size() == 0)
return null;
@@ -68,11 +74,11 @@ public class DefaultLoadBalancer extends
// The strategy here is to walk through the locations and hand them back, one at a time
// Grab an iterator off of the set of options; use a new iterator if it hands back something not in the current list.
if (assignments == null || !assignments.hasNext())
- assignments = locations.keySet().iterator();
+ assignments = randomize(locations.keySet()).iterator();
TServerInstance result = assignments.next();
if (!locations.containsKey(result)) {
assignments = null;
- return locations.keySet().iterator().next();
+ return randomize(locations.keySet()).iterator().next();
}
return result;
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java Mon Oct 15 22:17:22 2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.server.metan
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
+import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
@@ -93,7 +94,7 @@ public class PrintEvents {
m.readFields(dis);
LogFileValue lfv = new LogFileValue();
- lfv.mutations = new Mutation[] {m};
+ lfv.mutations = Collections.singletonList(m);
System.out.println(LogFileValue.format(lfv, 1));
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/MutationLog.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/MutationLog.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/MutationLog.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/MutationLog.java Mon Oct 15 22:17:22 2012
@@ -23,6 +23,7 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.data.ServerMutation;
import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -65,7 +66,7 @@ public class MutationLog {
final FSDataInputStream login = fs.open(logfile);
- final Mutation mutation = new Mutation();
+ final Mutation mutation = new ServerMutation();
return new Iterator<Mutation>() {
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Mon Oct 15 22:17:22 2012
@@ -142,6 +142,7 @@ import org.apache.accumulo.server.client
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.data.ServerMutation;
import org.apache.accumulo.server.logger.LogFileKey;
import org.apache.accumulo.server.logger.LogFileValue;
import org.apache.accumulo.server.master.state.Assignment;
@@ -195,6 +196,7 @@ import org.apache.accumulo.server.zookee
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.accumulo.start.Platform;
+import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -1435,7 +1437,7 @@ public class TabletServer extends Abstra
if (us.currentTablet != null) {
List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
for (TMutation tmutation : tmutations) {
- Mutation mutation = new Mutation(tmutation);
+ Mutation mutation = new ServerMutation(tmutation);
mutations.add(mutation);
us.queuedMutationSize += mutation.numBytes();
}
@@ -1646,7 +1648,7 @@ public class TabletServer extends Abstra
long opid = writeTracker.startWrite(TabletType.type(keyExtent));
try {
- Mutation mutation = new Mutation(tmutation);
+ Mutation mutation = new ServerMutation(tmutation);
List<Mutation> mutations = Collections.singletonList(mutation);
Span prep = Trace.start("prep");
@@ -2296,8 +2298,10 @@ public class TabletServer extends Abstra
if (t == null) {
// Tablet has probably been recently unloaded: repeated master
// unload request is crossing the successful unloaded message
- log.info("told to unload tablet that was not being served " + extent);
- enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOAD_FAILURE_NOT_SERVING, extent));
+ if (!recentlyUnloadedCache.containsKey(extent)) {
+ log.info("told to unload tablet that was not being served " + extent);
+ enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOAD_FAILURE_NOT_SERVING, extent));
+ }
return;
}
@@ -2317,6 +2321,7 @@ public class TabletServer extends Abstra
// stop serving tablet - client will get not serving tablet
// exceptions
+ recentlyUnloadedCache.put(extent, System.currentTimeMillis());
onlineTablets.remove(extent);
try {
@@ -2483,6 +2488,7 @@ public class TabletServer extends Abstra
openingTablets.remove(extentToOpen);
onlineTablets.put(extentToOpen, tablet);
openingTablets.notifyAll();
+ recentlyUnloadedCache.remove(tablet);
}
}
tablet = null; // release this reference
@@ -2536,6 +2542,8 @@ public class TabletServer extends Abstra
private SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
private SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
private SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
+ @SuppressWarnings("unchecked")
+ private Map<KeyExtent,Long> recentlyUnloadedCache = (Map<KeyExtent, Long>)Collections.synchronizedMap(new LRUMap(1000));
private Thread majorCompactorThread;
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java Mon Oct 15 22:17:22 2012
@@ -19,13 +19,12 @@
*/
package org.apache.accumulo.server.tabletserver;
-import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.client.admin.TimeType;
-import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.server.data.ServerMutation;
import org.apache.accumulo.server.util.time.RelativeTime;
public abstract class TabletTime {
@@ -57,13 +56,8 @@ public abstract class TabletTime {
abstract long getAndUpdateTime();
protected void setSystemTimes(Mutation mutation, long lastCommitTime) {
- Collection<ColumnUpdate> updates = mutation.getUpdates();
- for (ColumnUpdate cvp : updates) {
- if (!cvp.hasTimestamp()) {
- cvp.setSystemTimestamp(lastCommitTime);
-
- }
- }
+ ServerMutation m = (ServerMutation)mutation;
+ m.setSystemTimestamp(lastCommitTime);
}
static TabletTime getInstance(String metadataValue) {
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Mon Oct 15 22:17:22 2012
@@ -37,12 +37,12 @@ import org.apache.accumulo.core.conf.Acc
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.StringUtil;
import org.apache.accumulo.server.logger.LogFileKey;
import org.apache.accumulo.server.logger.LogFileValue;
import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.tabletserver.TabletMutations;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -310,7 +310,7 @@ public class DfsLogger {
}
public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {
- return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation.toThrift()))));
+ return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation))));
}
public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
@@ -318,16 +318,13 @@ public class DfsLogger {
synchronized (DfsLogger.this) {
try {
- for (TabletMutations mutation : mutations) {
+ for (TabletMutations tabletMutations : mutations) {
LogFileKey key = new LogFileKey();
key.event = MANY_MUTATIONS;
- key.seq = mutation.seq;
- key.tid = mutation.tabletID;
+ key.seq = tabletMutations.getSeq();
+ key.tid = tabletMutations.getTid();
LogFileValue value = new LogFileValue();
- Mutation[] m = new Mutation[mutation.mutations.size()];
- for (int i = 0; i < m.length; i++)
- m[i] = new Mutation(mutation.mutations.get(i));
- value.mutations = m;
+ value.mutations = tabletMutations.getMutations();
write(key, value);
}
} catch (Exception e) {
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java Mon Oct 15 22:17:22 2012
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Set;
import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -208,10 +209,10 @@ public class SortedLogRecovery {
// log.info("Replaying " + key);
// log.info(value);
if (key.event == MUTATION) {
- mr.receive(value.mutations[0]);
+ mr.receive(value.mutations.get(0));
} else if (key.event == MANY_MUTATIONS) {
- for (int i = 0; i < value.mutations.length; i++) {
- mr.receive(value.mutations[i]);
+ for (Mutation m : value.mutations) {
+ mr.receive(m);
}
} else {
throw new RuntimeException("unexpected log key type: " + key.event);
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Mon Oct 15 22:17:22 2012
@@ -33,11 +33,10 @@ import java.util.concurrent.locks.Reentr
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.thrift.TMutation;
-import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.tabletserver.Tablet;
import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
+import org.apache.accumulo.server.tabletserver.TabletMutations;
import org.apache.accumulo.server.tabletserver.TabletServer;
import org.apache.accumulo.server.tabletserver.log.DfsLogger.LoggerOperation;
import org.apache.log4j.Logger;
@@ -366,10 +365,7 @@ public class TabletServerLogger {
List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) {
CommitSession cs = entry.getKey();
- ArrayList<TMutation> tmutations = new ArrayList<TMutation>(entry.getValue().size());
- for (Mutation m : entry.getValue())
- tmutations.add(m.toThrift());
- copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), tmutations));
+ copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), entry.getValue()));
}
return logger.logManyTablets(copy);
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousBatchWalker.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousBatchWalker.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousBatchWalker.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousBatchWalker.java Mon Oct 15 22:17:22 2012
@@ -32,7 +32,9 @@ import org.apache.accumulo.core.client.Z
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.test.continuous.ContinuousWalk.RandomAuths;
import org.apache.hadoop.io.Text;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
@@ -41,6 +43,7 @@ import org.apache.log4j.PatternLayout;
public class ContinuousBatchWalker {
private static String debugLog = null;
+ private static String authsFile = null;
private static String[] processOptions(String[] args) {
ArrayList<String> al = new ArrayList<String>();
@@ -48,6 +51,8 @@ public class ContinuousBatchWalker {
for (int i = 0; i < args.length; i++) {
if (args[i].equals("--debug")) {
debugLog = args[++i];
+ } else if (args[i].equals("--auths")) {
+ authsFile = args[++i];
} else {
al.add(args[i]);
}
@@ -62,7 +67,7 @@ public class ContinuousBatchWalker {
if (args.length != 10) {
throw new IllegalArgumentException("usage : " + ContinuousBatchWalker.class.getName()
- + " [--debug <debug log>] <instance name> <zookeepers> <user> <pass> <table> <min> <max> <sleep time> <batch size> <query threads>");
+ + " [--debug <debug log>] [--auths <file>] <instance name> <zookeepers> <user> <pass> <table> <min> <max> <sleep time> <batch size> <query threads>");
}
if (debugLog != null) {
@@ -88,12 +93,14 @@ public class ContinuousBatchWalker {
int batchSize = Integer.parseInt(args[8]);
int numQueryThreads = Integer.parseInt(args[9]);
- Connector conn = new ZooKeeperInstance(instanceName, zooKeepers).getConnector(user, password.getBytes());
- Scanner scanner = conn.createScanner(table, Constants.NO_AUTHS);
- BatchScanner bs = conn.createBatchScanner(table, Constants.NO_AUTHS, numQueryThreads);
-
Random r = new Random();
-
+ RandomAuths randomAuths = new RandomAuths(authsFile);
+ Authorizations auths = randomAuths.getAuths(r);
+
+ Connector conn = new ZooKeeperInstance(instanceName, zooKeepers).getConnector(user, password.getBytes());
+ Scanner scanner = conn.createScanner(table, auths);
+ BatchScanner bs = conn.createBatchScanner(table, auths, numQueryThreads);
+
while (true) {
Set<Text> batch = getBatch(scanner, min, max, batchSize, r);
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java Mon Oct 15 22:17:22 2012
@@ -16,8 +16,12 @@
*/
package org.apache.accumulo.server.test.continuous;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -38,8 +42,12 @@ import org.apache.accumulo.core.client.T
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.test.FastFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
@@ -49,15 +57,44 @@ import org.apache.log4j.PatternLayout;
public class ContinuousIngest {
+ private static String visFile = null;
private static String debugLog = null;
private static final byte[] EMPTY_BYTES = new byte[0];
+ private static List<ColumnVisibility> visibilities;
+
+ private static void initVisibilities() throws Exception {
+ if (visFile == null) {
+ visibilities = Collections.singletonList(new ColumnVisibility());
+ return;
+ }
+
+ visibilities = new ArrayList<ColumnVisibility>();
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(visFile))));
+
+ String line;
+
+ while ((line = in.readLine()) != null) {
+ visibilities.add(new ColumnVisibility(line));
+ }
+
+ in.close();
+ }
+
+ private static ColumnVisibility getVisibility(Random rand) {
+ return visibilities.get(rand.nextInt(visibilities.size()));
+ }
+
private static String[] processOptions(String[] args) {
ArrayList<String> al = new ArrayList<String>();
for (int i = 0; i < args.length; i++) {
if (args[i].equals("--debug")) {
debugLog = args[++i];
+ } else if (args[i].equals("--visibilities")) {
+ visFile = args[++i];
} else {
al.add(args[i]);
}
@@ -74,7 +111,7 @@ public class ContinuousIngest {
throw new IllegalArgumentException(
"usage : "
+ ContinuousIngest.class.getName()
- + " [--debug <debug log>] <instance name> <zookeepers> <user> <pass> <table> <num> <min> <max> <max colf> <max colq> <max mem> <max latency> <max threads> <enable checksum>");
+ + " [--debug <debug log>] [--visibilities <file>] <instance name> <zookeepers> <user> <pass> <table> <num> <min> <max> <max colf> <max colq> <max mem> <max latency> <max threads> <enable checksum>");
}
if (debugLog != null) {
@@ -84,6 +121,8 @@ public class ContinuousIngest {
logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true));
}
+ initVisibilities();
+
String instanceName = args[0];
String zooKeepers = args[1];
@@ -145,6 +184,8 @@ public class ContinuousIngest {
out: while (true) {
// generate first set of nodes
+ ColumnVisibility cv = getVisibility(r);
+
for (int index = 0; index < flushInterval; index++) {
long rowLong = genLong(min, max, r);
prevRows[index] = rowLong;
@@ -156,7 +197,7 @@ public class ContinuousIngest {
firstColFams[index] = cf;
firstColQuals[index] = cq;
- Mutation m = genMutation(rowLong, cf, cq, ingestInstanceId, count, null, r, checksum);
+ Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, checksum);
count++;
bw.addMutation(m);
}
@@ -171,7 +212,7 @@ public class ContinuousIngest {
long rowLong = genLong(min, max, r);
byte[] prevRow = genRow(prevRows[index]);
prevRows[index] = rowLong;
- Mutation m = genMutation(rowLong, r.nextInt(maxColF), r.nextInt(maxColQ), ingestInstanceId, count, prevRow, r, checksum);
+ Mutation m = genMutation(rowLong, r.nextInt(maxColF), r.nextInt(maxColQ), cv, ingestInstanceId, count, prevRow, r, checksum);
count++;
bw.addMutation(m);
}
@@ -184,7 +225,8 @@ public class ContinuousIngest {
// create one big linked list, this makes all of the first inserts
// point to something
for (int index = 0; index < flushInterval - 1; index++) {
- Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], ingestInstanceId, count, genRow(prevRows[index + 1]), r, checksum);
+ Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, ingestInstanceId, count, genRow(prevRows[index + 1]), r,
+ checksum);
count++;
bw.addMutation(m);
}
@@ -195,7 +237,7 @@ public class ContinuousIngest {
bw.close();
}
-
+
private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) throws MutationsRejectedException {
long t1 = System.currentTimeMillis();
bw.flush();
@@ -205,7 +247,8 @@ public class ContinuousIngest {
return lastFlushTime;
}
- public static Mutation genMutation(long rowLong, int cfInt, int cqInt, byte[] ingestInstanceId, long count, byte[] prevRow, Random r, boolean checksum) {
+ public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, byte[] ingestInstanceId, long count, byte[] prevRow, Random r,
+ boolean checksum) {
// Adler32 is supposed to be faster, but according to wikipedia is not good for small data.... so used CRC32 instead
CRC32 cksum = null;
@@ -219,11 +262,12 @@ public class ContinuousIngest {
cksum.update(rowString);
cksum.update(cfString);
cksum.update(cqString);
+ cksum.update(cv.getExpression());
}
Mutation m = new Mutation(new Text(rowString));
- m.put(new Text(cfString), new Text(cqString), createValue(ingestInstanceId, count, prevRow, cksum));
+ m.put(new Text(cfString), new Text(cqString), cv, createValue(ingestInstanceId, count, prevRow, cksum));
return m;
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java Mon Oct 15 22:17:22 2012
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -64,6 +65,8 @@ public class ContinuousMoru extends Conf
private byte[] iiId;
private long count;
+ private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility();
+
public void setup(Context context) throws IOException, InterruptedException {
int max_cf = context.getConfiguration().getInt(MAX_CF, -1);
int max_cq = context.getConfiguration().getInt(MAX_CQ, -1);
@@ -92,7 +95,8 @@ public class ContinuousMoru extends Conf
int offset = ContinuousWalk.getPrevRowOffset(val);
if (offset > 0) {
long rowLong = Long.parseLong(new String(val, offset, 16), 16);
- Mutation m = ContinuousIngest.genMutation(rowLong, random.nextInt(max_cf), random.nextInt(max_cq), iiId, count++, key.getRowData().toArray(), random,
+ Mutation m = ContinuousIngest.genMutation(rowLong, random.nextInt(max_cf), random.nextInt(max_cq), EMPTY_VIS, iiId, count++, key.getRowData()
+ .toArray(), random,
true);
context.write(null, m);
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousScanner.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousScanner.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousScanner.java Mon Oct 15 22:17:22 2012
@@ -29,7 +29,9 @@ import org.apache.accumulo.core.client.Z
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.test.continuous.ContinuousWalk.RandomAuths;
import org.apache.hadoop.io.Text;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
@@ -38,6 +40,7 @@ import org.apache.log4j.PatternLayout;
public class ContinuousScanner {
private static String debugLog = null;
+ private static String authsFile = null;
private static String[] processOptions(String[] args) {
ArrayList<String> al = new ArrayList<String>();
@@ -45,6 +48,8 @@ public class ContinuousScanner {
for (int i = 0; i < args.length; i++) {
if (args[i].equals("--debug")) {
debugLog = args[++i];
+ } else if (args[i].equals("--auths")) {
+ authsFile = args[++i];
} else {
al.add(args[i]);
}
@@ -59,7 +64,7 @@ public class ContinuousScanner {
if (args.length != 9) {
throw new IllegalArgumentException("usage : " + ContinuousScanner.class.getName()
- + " [--debug <debug log>] <instance name> <zookeepers> <user> <pass> <table> <min> <max> <sleep time> <num to scan>");
+ + " [--debug <debug log>] [--auths <file>] <instance name> <zookeepers> <user> <pass> <table> <min> <max> <sleep time> <num to scan>");
}
if (debugLog != null) {
@@ -69,6 +74,8 @@ public class ContinuousScanner {
logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true));
}
+ Random r = new Random();
+
String instanceName = args[0];
String zooKeepers = args[1];
@@ -85,11 +92,12 @@ public class ContinuousScanner {
int numToScan = Integer.parseInt(args[8]);
+ RandomAuths randomAuths = new RandomAuths(authsFile);
+
Instance instance = new ZooKeeperInstance(instanceName, zooKeepers);
Connector conn = instance.getConnector(user, password.getBytes());
- Scanner scanner = conn.createScanner(table, Constants.NO_AUTHS);
-
- Random r = new Random();
+ Authorizations auths = randomAuths.getAuths(r);
+ Scanner scanner = conn.createScanner(table, auths);
double delta = Math.min(.05, .05 / (numToScan / 1000.0));
// System.out.println("Delta "+delta);
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java Mon Oct 15 22:17:22 2012
@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.Random;
import java.util.Set;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
@@ -135,6 +136,20 @@ public class ContinuousVerify extends Co
@Override
public int run(String[] args) throws Exception {
+
+ String auths = "";
+ ArrayList<String> argsList = new ArrayList<String>();
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("--auths")) {
+ auths = args[++i];
+ } else {
+ argsList.add(args[i]);
+ }
+ }
+
+ args = argsList.toArray(new String[0]);
+
if (args.length != 9) {
throw new IllegalArgumentException("Usage : " + ContinuousVerify.class.getName()
+ " <instance name> <zookeepers> <user> <pass> <table> <output dir> <max mappers> <num reducers> <scan offline>");
@@ -165,9 +180,16 @@ public class ContinuousVerify extends Co
}
job.setInputFormatClass(AccumuloInputFormat.class);
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), clone, new Authorizations());
+ Authorizations authorizations;
+ if (auths == null || auths.trim().equals(""))
+ authorizations = Constants.NO_AUTHS;
+ else
+ authorizations = new Authorizations(auths.split(","));
+
+ AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, pass.getBytes(), clone, authorizations);
AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
AccumuloInputFormat.setScanOffline(job.getConfiguration(), scanOffline);
+
// set up ranges
try {
Set<Range> ranges = new ZooKeeperInstance(instance, zookeepers).getConnector(user, pass.getBytes()).tableOperations()
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousWalk.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousWalk.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousWalk.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousWalk.java Mon Oct 15 22:17:22 2012
@@ -16,8 +16,13 @@
*/
package org.apache.accumulo.server.test.continuous;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import java.util.zip.CRC32;
@@ -37,6 +42,9 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.Accumulo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
@@ -47,6 +55,7 @@ import org.apache.log4j.PatternLayout;
public class ContinuousWalk {
private static String debugLog = null;
+ private static String authsFile = null;
static class BadChecksumException extends RuntimeException {
@@ -64,6 +73,8 @@ public class ContinuousWalk {
for (int i = 0; i < args.length; i++) {
if (args[i].equals("--debug")) {
debugLog = args[++i];
+ } else if (args[i].equals("--auths")) {
+ authsFile = args[++i];
} else {
al.add(args[i]);
}
@@ -72,13 +83,41 @@ public class ContinuousWalk {
return al.toArray(new String[al.size()]);
}
+ static class RandomAuths {
+ private List<Authorizations> auths;
+
+ RandomAuths(String file) throws IOException {
+ if (file == null) {
+ auths = Collections.singletonList(Constants.NO_AUTHS);
+ return;
+ }
+
+ auths = new ArrayList<Authorizations>();
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(file))));
+
+ String line;
+
+ while ((line = in.readLine()) != null) {
+ auths.add(new Authorizations(line.split(",")));
+ }
+
+ in.close();
+ }
+
+ Authorizations getAuths(Random r) {
+ return auths.get(r.nextInt(auths.size()));
+ }
+ }
+
public static void main(String[] args) throws Exception {
args = processOptions(args);
if (args.length != 8) {
throw new IllegalArgumentException("usage : " + ContinuousWalk.class.getName()
- + " [--debug <debug log>] <instance name> <zookeepers> <user> <pass> <table> <min> <max> <sleep time>");
+ + " [--debug <debug log>] [--auths <file>] <instance name> <zookeepers> <user> <pass> <table> <min> <max> <sleep time>");
}
if (debugLog != null) {
@@ -108,13 +147,14 @@ public class ContinuousWalk {
Tracer.getInstance().addReceiver(new ZooSpanClient(zooKeepers, path, localhost, "cwalk", 1000));
Accumulo.enableTracing(localhost, "ContinuousWalk");
Connector conn = instance.getConnector(user, password.getBytes());
- Scanner scanner = conn.createScanner(table, new Authorizations());
Random r = new Random();
+ RandomAuths randomAuths = new RandomAuths(authsFile);
ArrayList<Value> values = new ArrayList<Value>();
while (true) {
+ Scanner scanner = conn.createScanner(table, randomAuths.getAuths(r));
String row = findAStartRow(min, max, scanner, r);
while (row != null) {
@@ -228,6 +268,7 @@ public class ContinuousWalk {
cksum.update(key.getRowData().toArray());
cksum.update(key.getColumnFamilyData().toArray());
cksum.update(key.getColumnQualifierData().toArray());
+ cksum.update(key.getColumnVisibilityData().toArray());
cksum.update(value.get(), 0, ckOff);
if (cksum.getValue() != storedCksum) {
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/scalability/Ingest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/scalability/Ingest.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/scalability/Ingest.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/scalability/Ingest.java Mon Oct 15 22:17:22 2012
@@ -26,6 +26,7 @@ import org.apache.accumulo.core.client.C
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.server.test.continuous.ContinuousIngest;
public class Ingest extends ScaleTest {
@@ -91,6 +92,8 @@ public class Ingest extends ScaleTest {
long count = 0;
long totalBytes = 0;
+ ColumnVisibility cv = new ColumnVisibility();
+
// start timer
startTimer();
@@ -98,7 +101,7 @@ public class Ingest extends ScaleTest {
while (count < numIngestEntries) {
count++;
long rowId = ContinuousIngest.genLong(minRow, maxRow, r);
- Mutation m = ContinuousIngest.genMutation(rowId, r.nextInt(maxColF), r.nextInt(maxColQ), ingestInstanceId.getBytes(), count, null, r, false);
+ Mutation m = ContinuousIngest.genMutation(rowId, r.nextInt(maxColF), r.nextInt(maxColQ), cv, ingestInstanceId.getBytes(), count, null, r, false);
totalBytes += m.numBytes();
try {
bw.addMutation(m);
Modified: accumulo/branches/ACCUMULO-259/server/src/test/java/org/apache/accumulo/server/logger/LogFileTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/test/java/org/apache/accumulo/server/logger/LogFileTest.java?rev=1398539&r1=1398538&r2=1398539&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/test/java/org/apache/accumulo/server/logger/LogFileTest.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/test/java/org/apache/accumulo/server/logger/LogFileTest.java Mon Oct 15 22:17:22 2012
@@ -27,15 +27,11 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collection;
-import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.server.logger.LogEvents;
-import org.apache.accumulo.server.logger.LogFileKey;
-import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.data.ServerMutation;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@@ -53,7 +49,7 @@ public class LogFileTest {
key.tablet = tablet;
key.tserverSession = keyResult.tserverSession;
LogFileValue value = new LogFileValue();
- value.mutations = mutations != null ? mutations : new Mutation[0];
+ value.mutations = Arrays.asList(mutations != null ? mutations : new Mutation[0]);
DataOutputBuffer out = new DataOutputBuffer();
key.write(out);
value.write(out);
@@ -63,20 +59,10 @@ public class LogFileTest {
keyResult.readFields(in);
valueResult.readFields(in);
assertTrue(key.compareTo(keyResult) == 0);
- assertTrue(Arrays.equals(value.mutations, valueResult.mutations));
+ assertEquals(value.mutations, valueResult.mutations);
assertTrue(in.read() == -1);
}
- static void assertEqualsMutations(Mutation[] a, Mutation[] b) {
- if (a.length == b.length)
- for (int i = 0; i < a.length; i++) {
- assertEquals(a[i], b[i]);
- Collection<ColumnUpdate> au = a[i].getUpdates();
- Collection<ColumnUpdate> bu = a[i].getUpdates();
- assertEquals(au, bu);
- }
- }
-
@Test
public void testReadFields() throws IOException {
LogFileKey key = new LogFileKey();
@@ -99,18 +85,18 @@ public class LogFileTest {
assertEquals(key.seq, 5);
assertEquals(key.tid, 6);
assertEquals(key.tablet, tablet);
- Mutation m = new Mutation(new Text("row"));
+ Mutation m = new ServerMutation(new Text("row"));
m.put(new Text("cf"), new Text("cq"), new Value("value".getBytes()));
readWrite(MUTATION, 7, 8, null, null, new Mutation[] {m}, key, value);
assertEquals(key.event, MUTATION);
assertEquals(key.seq, 7);
assertEquals(key.tid, 8);
- assertEqualsMutations(value.mutations, new Mutation[] {m});
+ assertEquals(value.mutations, Arrays.asList(m));
readWrite(MANY_MUTATIONS, 9, 10, null, null, new Mutation[] {m, m}, key, value);
assertEquals(key.event, MANY_MUTATIONS);
assertEquals(key.seq, 9);
assertEquals(key.tid, 10);
- assertEqualsMutations(value.mutations, new Mutation[] {m, m});
+ assertEquals(value.mutations, Arrays.asList(m, m));
}
@Test