You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/21 03:59:58 UTC
[39/50] [abbrv] git commit: ACCUMULO-378 Try to write some better
tests for getting WALs closed for replication
ACCUMULO-378 Try to write some better tests for getting WALs closed for replication
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dfa0fd34
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dfa0fd34
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dfa0fd34
Branch: refs/heads/ACCUMULO-378
Commit: dfa0fd34a4caea3bce7f2ed7902703f246f5fd68
Parents: a4b46d9
Author: Josh Elser <el...@apache.org>
Authored: Mon May 19 20:37:42 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon May 19 20:37:42 2014 -0400
----------------------------------------------------------------------
.../ReplicationFilesClosedAfterUnusedTest.java | 172 +++++++++++++++++++
.../replication/ReplicationSourceOnlyIT.java | 1 -
2 files changed, 172 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfa0fd34/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java
new file mode 100644
index 0000000..eb89317
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ReplicationFilesClosedAfterUnusedTest extends ConfigurableMacIT {
+ private static final Logger log = LoggerFactory.getLogger(ReplicationFilesClosedAfterUnusedTest.class);
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "0");
+ cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0s");
+ cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "0s");
+ cfg.setProperty(Property.REPLICATION_NAME, "master");
+ cfg.setNumTservers(1);
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test(timeout = 60000)
+ public void test() throws Exception {
+ Connector conn = getConnector();
+
+ String table = "table";
+ conn.tableOperations().create(table);
+ String tableId = conn.tableOperations().tableIdMap().get(table);
+
+ Assert.assertNotNull(tableId);
+
+ log.info("Writing to {}", tableId);
+
+ conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
+ conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+ // just sleep
+ conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+ ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "50000"));
+
+ // Write a mutation to make a log file
+ BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ Mutation m = new Mutation("one");
+ m.put("", "", "");
+ bw.addMutation(m);
+ bw.close();
+
+ // Write another to make sure the logger rolls itself?
+ bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ m = new Mutation("three");
+ m.put("", "", "");
+ bw.addMutation(m);
+ bw.close();
+
+ Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
+ s.setRange(TabletsSection.getRange(tableId));
+ Set<String> wals = new HashSet<>();
+ for (Entry<Key,Value> entry : s) {
+ LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+ for (String file : logEntry.logSet) {
+ Path p = new Path(file);
+ wals.add(p.toString());
+ }
+ }
+
+ log.warn("Found wals {}", wals);
+
+ // for (int j = 0; j < 5; j++) {
+ bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ m = new Mutation("three");
+ byte[] bytes = new byte[1024 * 1024];
+ m.put("1".getBytes(), new byte[0], bytes);
+ m.put("2".getBytes(), new byte[0], bytes);
+ m.put("3".getBytes(), new byte[0], bytes);
+ m.put("4".getBytes(), new byte[0], bytes);
+ m.put("5".getBytes(), new byte[0], bytes);
+ bw.addMutation(m);
+ bw.close();
+
+ conn.tableOperations().flush(table, null, null, true);
+
+ while (!conn.tableOperations().exists(ReplicationTable.NAME)) {
+ UtilWaitThread.sleep(500);
+ }
+
+ for (int i = 0; i < 5; i++) {
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.fetchColumnFamily(LogColumnFamily.NAME);
+ s.setRange(TabletsSection.getRange(tableId));
+ for (Entry<Key,Value> entry : s) {
+ log.info(entry.getKey().toStringNoTruncate() + "=" + entry.getValue());
+ }
+
+ s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ Text buff = new Text();
+ boolean allReferencedLogsClosed = true;
+ int recordsFound = 0;
+ for (Entry<Key,Value> e : s) {
+ recordsFound++;
+ allReferencedLogsClosed = true;
+ StatusSection.getFile(e.getKey(), buff);
+ String file = buff.toString();
+ if (wals.contains(file)) {
+ Status stat = Status.parseFrom(e.getValue().get());
+ if (!stat.getClosed()) {
+ log.info("{} wasn't closed", file);
+ allReferencedLogsClosed = false;
+ }
+ }
+ }
+
+ if (recordsFound > 0 && allReferencedLogsClosed) {
+ return;
+ }
+
+ Thread.sleep(1000);
+ }
+
+ Assert.fail("We had a file that was referenced but didn't get closed");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfa0fd34/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
index 9d9021e..81ae5ca 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
@@ -67,7 +67,6 @@ public class ReplicationSourceOnlyIT extends ConfigurableMacIT {
cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
cfg.setProperty(Property.GC_CYCLE_START, "1s");
cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
- // cfg.useMiniDFS(true);
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}