You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/09 17:28:44 UTC

[06/50] [abbrv] git commit: ACCUMULO-378 Fix up some tests that failed on jenkins and remove now unnecessary test

ACCUMULO-378 Fix up some tests that failed on jenkins and remove now unnecessary test


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f247c8e2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f247c8e2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f247c8e2

Branch: refs/heads/ACCUMULO-378
Commit: f247c8e251abb99765ed28b98873f22152d52f58
Parents: 91396e5
Author: Josh Elser <el...@apache.org>
Authored: Wed Apr 30 17:30:44 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Apr 30 17:30:44 2014 -0400

----------------------------------------------------------------------
 .../replication/ReplicationDeadlockTest.java    |   3 -
 .../ReplicationTableTimestampIT.java            | 245 -------------------
 .../test/replication/ReplicationWithGCIT.java   |   2 +-
 .../replication/ReplicationWithMakerTest.java   |  40 ++-
 4 files changed, 32 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f247c8e2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
index 9713c8c..d43aa32 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
@@ -17,7 +17,6 @@
 package org.apache.accumulo.test.replication;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -36,9 +35,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Lo
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
 import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f247c8e2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTableTimestampIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTableTimestampIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTableTimestampIT.java
deleted file mode 100644
index 3111164..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTableTimestampIT.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * Integration Tests that attempt to evaluate the accuracy of the internal bookkeeping performed on the accumulo "master" instance. Does not send data to any
- * remote instance, merely tracks what is stored locally.
- */
-public class ReplicationTableTimestampIT extends ConfigurableMacIT {
-  @Override
-  public int defaultTimeoutSeconds() {
-    return 300;
-  }
-
-  @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0s");
-    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
-    cfg.setProperty(Property.GC_CYCLE_START, "1s");
-    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
-    cfg.setNumTservers(1);
-    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-  }
-
-  private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
-    Multimap<String,String> logs = HashMultimap.create();
-    Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    scanner.fetchColumnFamily(LogColumnFamily.NAME);
-    scanner.setRange(new Range());
-    for (Entry<Key,Value> entry : scanner) {
-      if (Thread.interrupted()) {
-        return logs;
-      }
-
-      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-
-      for (String log : logEntry.logSet) {
-        logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
-      }
-    }
-    return logs;
-  }
-
-  @Test
-  public void closedReplicationStatusStayClosed() throws Exception {
-    final Connector conn = getConnector();
-    String table1 = "table1", table2 = "table2", table3 = "table3";
-    final Multimap<String,String> metadataTableWals = HashMultimap.create();
-    final AtomicBoolean keepRunning = new AtomicBoolean(true);
-
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
-        // when that happens
-        while (keepRunning.get()) {
-          try {
-            metadataTableWals.putAll(getLogs(conn));
-          } catch (TableNotFoundException e) {
-            log.error("Metadata table doesn't exist");
-          }
-        }
-      }
-    
-    });
-
-    t.start();
-
-    conn.tableOperations().create(table1);
-    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-    Thread.sleep(1000);
-
-    // Write some data to table1
-    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    conn.tableOperations().create(table2);
-    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-    Thread.sleep(1000);
-
-    // Write some data to table2
-    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    conn.tableOperations().create(table3);
-    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-    Thread.sleep(1000);
-
-    // Write some data to table3
-    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    // Stop the thread which is constantly scanning metadata to track all WALs seen
-    keepRunning.set(false);
-    t.join(5000);
-
-    // See which files we have for replication in the replication table
-    Scanner s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    Set<String> replFiles = new HashSet<>();
-    for (Entry<Key,Value> entry : s) {
-      replFiles.add(entry.getKey().getRow().toString());
-    }
-
-    // We might have a WAL that was used solely for the replication table
-    // We want to remove that from our list as it should not appear in the replication table
-    String replicationTableId = conn.tableOperations().tableIdMap().get(ReplicationTable.NAME);
-    Iterator<Entry<String,String>> observedLogs = metadataTableWals.entries().iterator();
-    while (observedLogs.hasNext()) {
-      Entry<String,String> observedLog = observedLogs.next();
-      if (replicationTableId.equals(observedLog.getValue())) {
-        observedLogs.remove();
-      }
-    }
-
-    // We should have *some* reference to each log that was seen in the metadata table
-    // They might not yet all be closed though (might be newfile)
-    Assert.assertEquals("Metadata log distribution: " + metadataTableWals, metadataTableWals.keySet(), replFiles);
-
-    LinkedListMultimap<String,Entry<Key,Value>> kvByRow = LinkedListMultimap.create();
-    s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    for (Entry<Key,Value> entry : s) {
-      kvByRow.put(entry.getKey().getRow().toString(), entry);
-    }
-
-    for (String row : kvByRow.keySet()) {
-      ArrayList<Entry<Key,Value>> kvs = new ArrayList<>(kvByRow.get(row));
-      Collections.sort(kvs, new Comparator<Entry<Key,Value>>() {
-        @Override
-        public int compare(Entry<Key,Value> o1, Entry<Key,Value> o2) {
-          return (new Long(o1.getKey().getTimestamp())).compareTo(new Long(o2.getKey().getTimestamp()));
-        }
-      });
-
-      Key closedKey = null;
-      boolean observedClosed = false;
-      for (Entry<Key,Value> kv : kvs) {
-        Status status = Status.parseFrom(kv.getValue().get());
-
-        // Once we get a closed record, every subsequent record should *also* be closed
-        // A file cannot be "re-opened"
-        if (!observedClosed) {
-          if (status.getClosed()) {
-            closedKey = kv.getKey();
-            observedClosed = true;
-          }
-        } else {
-          Assert.assertTrue("Found a non-closed Status (" + kv.getKey().toStringNoTruncate() + ") after a closed Status (" + closedKey.toStringNoTruncate() + ") was observed", status.getClosed());
-        }
-      }
-      
-    }
-
-    for (String replFile : replFiles) {
-      Path p = new Path(replFile);
-      FileSystem fs = p.getFileSystem(new Configuration());
-      Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f247c8e2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
index 3f6b8dc..449827b 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
@@ -496,7 +496,7 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
     s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
     s.setRange(ReplicationSection.getRange());
     recordsFound = 0;
-    for (@SuppressWarnings("unused") Entry<Key,Value> entry : s) {
+    for (Entry<Key,Value> entry : s) {
       recordsFound++;
       log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f247c8e2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
index 1ffb2a2..1c056ed 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
@@ -33,7 +33,9 @@ import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
 import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.conf.Configuration;
@@ -58,12 +60,20 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
     cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+    // Run the process in the master which writes replication records from metadata to replication
+    // repeatedly without pause
     cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0");
     cfg.setNumTservers(1);
   }
 
   @Test
   public void singleTableSingleTarget() throws Exception {
+    // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons
+    // against expected Status messages.
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
+      cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc);
+    }
+    
     Connector conn = getConnector();
     String table1 = "table1";
 
@@ -80,7 +90,7 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
         conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
         // Replicate table1 to cluster1 in the table with id of '4'
         conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
-        attempts = 0;
+        break;
       } catch (Exception e) {
         attempts--;
         if (attempts <= 0) {
@@ -113,26 +123,39 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
         attempts--;
       }
     } while (!exists && attempts > 0);
-    Assert.assertTrue("Replication table did not exist", exists);
+    Assert.assertTrue("Replication table was never created", exists);
 
     // ACCUMULO-2743 The Observer in the tserver has to be made aware of the change to get the combiner (made by the master)
     for (int i = 0; i < 5 && !conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME); i++) {
       UtilWaitThread.sleep(1000);
     }
 
-    Assert.assertTrue("Did not find expected combiner", conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME));
+    Assert.assertTrue("Combiner was never set on replication table",
+        conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME));
+
+    // Trigger the minor compaction, waiting for it to finish.
+    // This should write the entry to metadata that the file has data
+    conn.tableOperations().flush(ReplicationTable.NAME, null, null, true);
 
     // Make sure that we have one status element, should be a new file
     Scanner s = ReplicationTable.getScanner(conn);
     StatusSection.limit(s);
     Entry<Key,Value> entry = null;
     attempts = 5;
+    // This record will move from new to new with infinite length because of the minc (flush)
+    Status expectedStatus = StatusUtil.openWithUnknownLength();
     while (null == entry && attempts > 0) {
-      try{
+      try {
         entry = Iterables.getOnlyElement(s);
+        if (!expectedStatus.equals(Status.parseFrom(entry.getValue().get()))) {
+          entry = null;
+          // the master process didn't yet fire and write the new mutation, wait for it to do
+          // so and try to read it again
+          Thread.sleep(1000);
+        }
       } catch (NoSuchElementException e) {
         entry = null;
-        Thread.sleep(200);
+        Thread.sleep(500);
       } catch (IllegalArgumentException e) {
         // saw this contain 2 elements once
         s = ReplicationTable.getScanner(conn);
@@ -146,8 +169,8 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
       }
     }
 
-    Assert.assertNotNull(entry);
-    Assert.assertEquals(StatusUtil.openWithUnknownLength(), Status.parseFrom(entry.getValue().get()));
+    Assert.assertNotNull("Could not find expected entry in replication table", entry);
+    Assert.assertEquals("Expected to find a replication entry that is open with infinite length", expectedStatus, Status.parseFrom(entry.getValue().get()));
 
     // Try a couple of times to watch for the work record to be created
     boolean notFound = true;
@@ -276,8 +299,7 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
         Text expectedColqual = ReplicationTarget.toText(new ReplicationTarget("cluster1", "4"));
         Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
         notFound = false;
-      } catch (NoSuchElementException e) {
-      } catch (IllegalArgumentException e) {
+      } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) {
         s = ReplicationTable.getScanner(conn);
         for (Entry<Key,Value> content : s) {
           log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());