You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/09 17:28:44 UTC
[06/50] [abbrv] git commit: ACCUMULO-378 Fix up some tests that
failed on jenkins and remove now unnecessary test
ACCUMULO-378 Fix up some tests that failed on jenkins and remove now unnecessary test
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f247c8e2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f247c8e2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f247c8e2
Branch: refs/heads/ACCUMULO-378
Commit: f247c8e251abb99765ed28b98873f22152d52f58
Parents: 91396e5
Author: Josh Elser <el...@apache.org>
Authored: Wed Apr 30 17:30:44 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Apr 30 17:30:44 2014 -0400
----------------------------------------------------------------------
.../replication/ReplicationDeadlockTest.java | 3 -
.../ReplicationTableTimestampIT.java | 245 -------------------
.../test/replication/ReplicationWithGCIT.java | 2 +-
.../replication/ReplicationWithMakerTest.java | 40 ++-
4 files changed, 32 insertions(+), 258 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f247c8e2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
index 9713c8c..d43aa32 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
@@ -17,7 +17,6 @@
package org.apache.accumulo.test.replication;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
@@ -36,9 +35,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Lo
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
import org.apache.accumulo.server.replication.ReplicationTable;
import org.apache.accumulo.test.functional.ConfigurableMacIT;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f247c8e2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTableTimestampIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTableTimestampIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTableTimestampIT.java
deleted file mode 100644
index 3111164..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTableTimestampIT.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * Integration Tests that attempt to evaluate the accuracy of the internal bookkeeping performed on the accumulo "master" instance. Does not send data to any
- * remote instance, merely tracks what is stored locally.
- */
-public class ReplicationTableTimestampIT extends ConfigurableMacIT {
- @Override
- public int defaultTimeoutSeconds() {
- return 300;
- }
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0s");
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
- cfg.setProperty(Property.GC_CYCLE_START, "1s");
- cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
- cfg.setNumTservers(1);
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- }
-
- private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
- Multimap<String,String> logs = HashMultimap.create();
- Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- scanner.fetchColumnFamily(LogColumnFamily.NAME);
- scanner.setRange(new Range());
- for (Entry<Key,Value> entry : scanner) {
- if (Thread.interrupted()) {
- return logs;
- }
-
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-
- for (String log : logEntry.logSet) {
- logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
- }
- }
- return logs;
- }
-
- @Test
- public void closedReplicationStatusStayClosed() throws Exception {
- final Connector conn = getConnector();
- String table1 = "table1", table2 = "table2", table3 = "table3";
- final Multimap<String,String> metadataTableWals = HashMultimap.create();
- final AtomicBoolean keepRunning = new AtomicBoolean(true);
-
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- // Should really be able to interrupt here, but the Scanner throws a fit to the logger
- // when that happens
- while (keepRunning.get()) {
- try {
- metadataTableWals.putAll(getLogs(conn));
- } catch (TableNotFoundException e) {
- log.error("Metadata table doesn't exist");
- }
- }
- }
-
- });
-
- t.start();
-
- conn.tableOperations().create(table1);
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- Thread.sleep(1000);
-
- // Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- conn.tableOperations().create(table2);
- conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- Thread.sleep(1000);
-
- // Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- conn.tableOperations().create(table3);
- conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- Thread.sleep(1000);
-
- // Write some data to table3
- bw = conn.createBatchWriter(table3, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Stop the thread which is constantly scanning metadata to track all WALs seen
- keepRunning.set(false);
- t.join(5000);
-
- // See which files we have for replication in the replication table
- Scanner s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- Set<String> replFiles = new HashSet<>();
- for (Entry<Key,Value> entry : s) {
- replFiles.add(entry.getKey().getRow().toString());
- }
-
- // We might have a WAL that was used solely for the replication table
- // We want to remove that from our list as it should not appear in the replication table
- String replicationTableId = conn.tableOperations().tableIdMap().get(ReplicationTable.NAME);
- Iterator<Entry<String,String>> observedLogs = metadataTableWals.entries().iterator();
- while (observedLogs.hasNext()) {
- Entry<String,String> observedLog = observedLogs.next();
- if (replicationTableId.equals(observedLog.getValue())) {
- observedLogs.remove();
- }
- }
-
- // We should have *some* reference to each log that was seen in the metadata table
- // They might not yet all be closed though (might be newfile)
- Assert.assertEquals("Metadata log distribution: " + metadataTableWals, metadataTableWals.keySet(), replFiles);
-
- LinkedListMultimap<String,Entry<Key,Value>> kvByRow = LinkedListMultimap.create();
- s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- for (Entry<Key,Value> entry : s) {
- kvByRow.put(entry.getKey().getRow().toString(), entry);
- }
-
- for (String row : kvByRow.keySet()) {
- ArrayList<Entry<Key,Value>> kvs = new ArrayList<>(kvByRow.get(row));
- Collections.sort(kvs, new Comparator<Entry<Key,Value>>() {
- @Override
- public int compare(Entry<Key,Value> o1, Entry<Key,Value> o2) {
- return (new Long(o1.getKey().getTimestamp())).compareTo(new Long(o2.getKey().getTimestamp()));
- }
- });
-
- Key closedKey = null;
- boolean observedClosed = false;
- for (Entry<Key,Value> kv : kvs) {
- Status status = Status.parseFrom(kv.getValue().get());
-
- // Once we get a closed record, every subsequent record should *also* be closed
- // A file cannot be "re-opened"
- if (!observedClosed) {
- if (status.getClosed()) {
- closedKey = kv.getKey();
- observedClosed = true;
- }
- } else {
- Assert.assertTrue("Found a non-closed Status (" + kv.getKey().toStringNoTruncate() + ") after a closed Status (" + closedKey.toStringNoTruncate() + ") was observed", status.getClosed());
- }
- }
-
- }
-
- for (String replFile : replFiles) {
- Path p = new Path(replFile);
- FileSystem fs = p.getFileSystem(new Configuration());
- Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f247c8e2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
index 3f6b8dc..449827b 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
@@ -496,7 +496,7 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
s.setRange(ReplicationSection.getRange());
recordsFound = 0;
- for (@SuppressWarnings("unused") Entry<Key,Value> entry : s) {
+ for (Entry<Key,Value> entry : s) {
recordsFound++;
log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f247c8e2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
index 1ffb2a2..1c056ed 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
@@ -33,7 +33,9 @@ import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.replication.StatusUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
import org.apache.accumulo.server.replication.ReplicationTable;
import org.apache.accumulo.test.functional.ConfigurableMacIT;
import org.apache.hadoop.conf.Configuration;
@@ -58,12 +60,20 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+ // Run the process in the master which writes replication records from metadata to replication
+ // repeatedly without pause
cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0");
cfg.setNumTservers(1);
}
@Test
public void singleTableSingleTarget() throws Exception {
+ // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons
+ // against expected Status messages.
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
+ cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc);
+ }
+
Connector conn = getConnector();
String table1 = "table1";
@@ -80,7 +90,7 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
// Replicate table1 to cluster1 in the table with id of '4'
conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
- attempts = 0;
+ break;
} catch (Exception e) {
attempts--;
if (attempts <= 0) {
@@ -113,26 +123,39 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
attempts--;
}
} while (!exists && attempts > 0);
- Assert.assertTrue("Replication table did not exist", exists);
+ Assert.assertTrue("Replication table was never created", exists);
// ACCUMULO-2743 The Observer in the tserver has to be made aware of the change to get the combiner (made by the master)
for (int i = 0; i < 5 && !conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME); i++) {
UtilWaitThread.sleep(1000);
}
- Assert.assertTrue("Did not find expected combiner", conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME));
+ Assert.assertTrue("Combiner was never set on replication table",
+ conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME));
+
+ // Trigger the minor compaction, waiting for it to finish.
+ // This should write the entry to metadata that the file has data
+ conn.tableOperations().flush(ReplicationTable.NAME, null, null, true);
// Make sure that we have one status element, should be a new file
Scanner s = ReplicationTable.getScanner(conn);
StatusSection.limit(s);
Entry<Key,Value> entry = null;
attempts = 5;
+ // This record will move from new to new with infinite length because of the minc (flush)
+ Status expectedStatus = StatusUtil.openWithUnknownLength();
while (null == entry && attempts > 0) {
- try{
+ try {
entry = Iterables.getOnlyElement(s);
+ if (!expectedStatus.equals(Status.parseFrom(entry.getValue().get()))) {
+ entry = null;
+ // the master process didn't yet fire and write the new mutation, wait for it to do
+ // so and try to read it again
+ Thread.sleep(1000);
+ }
} catch (NoSuchElementException e) {
entry = null;
- Thread.sleep(200);
+ Thread.sleep(500);
} catch (IllegalArgumentException e) {
// saw this contain 2 elements once
s = ReplicationTable.getScanner(conn);
@@ -146,8 +169,8 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
}
}
- Assert.assertNotNull(entry);
- Assert.assertEquals(StatusUtil.openWithUnknownLength(), Status.parseFrom(entry.getValue().get()));
+ Assert.assertNotNull("Could not find expected entry in replication table", entry);
+ Assert.assertEquals("Expected to find a replication entry that is open with infinite length", expectedStatus, Status.parseFrom(entry.getValue().get()));
// Try a couple of times to watch for the work record to be created
boolean notFound = true;
@@ -276,8 +299,7 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
Text expectedColqual = ReplicationTarget.toText(new ReplicationTarget("cluster1", "4"));
Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
notFound = false;
- } catch (NoSuchElementException e) {
- } catch (IllegalArgumentException e) {
+ } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) {
s = ReplicationTable.getScanner(conn);
for (Entry<Key,Value> content : s) {
log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());