You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2017/12/31 02:07:32 UTC

[accumulo] branch master updated: ACCUMULO-4751 Set createdTime for status records

This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 3161b98  ACCUMULO-4751 Set createdTime for status records
3161b98 is described below

commit 3161b98f05615844caf6980c8b5922375e92bd32
Author: Adam J. Shook <ad...@gmail.com>
AuthorDate: Thu Dec 14 09:36:59 2017 -0500

    ACCUMULO-4751 Set createdTime for status records
    
    Closes #339
---
 .../master/replication/ReplicationDriver.java      |   2 +-
 .../accumulo/master/replication/StatusMaker.java   |  40 +++++++-
 .../accumulo/test/replication/StatusMakerIT.java   | 108 ++++++++++++++++++++-
 3 files changed, 142 insertions(+), 8 deletions(-)

diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
index 48a9591..79bb917 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
@@ -72,7 +72,7 @@ public class ReplicationDriver extends Daemon {
           continue;
         }
 
-        statusMaker = new StatusMaker(conn);
+        statusMaker = new StatusMaker(conn, master.getFileSystem());
         workMaker = new WorkMaker(master, conn);
         finishedWorkUpdater = new FinishedWorkUpdater(conn);
         rcrr = new RemoveCompleteReplicationRecords(conn);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
index 7a41619..3b1e649 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.master.replication;
 
+import java.io.IOException;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -41,7 +42,9 @@ import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,12 +59,14 @@ public class StatusMaker {
   private static final Logger log = LoggerFactory.getLogger(StatusMaker.class);
 
   private final Connector conn;
+  private final VolumeManager fs;
 
   private BatchWriter replicationWriter, metadataWriter;
   private String sourceTableName = MetadataTable.NAME;
 
-  public StatusMaker(Connector conn) {
+  public StatusMaker(Connector conn, VolumeManager fs) {
     this.conn = conn;
+    this.fs = fs;
   }
 
   /**
@@ -198,8 +203,20 @@ public class StatusMaker {
   protected boolean addOrderRecord(Text file, Table.ID tableId, Status stat, Value value) {
     try {
       if (!stat.hasCreatedTime()) {
-        log.error("Status record ({}) for {} in table {} was written to metadata table which lacked createdTime", ProtobufUtil.toString(stat), file, tableId);
-        return false;
+        try {
+          // If the createdTime is not set, work around the issue by retrieving the WAL creation time
+          // from HDFS (or the current time if the WAL does not exist). See ACCUMULO-4751
+          long createdTime = setAndGetCreatedTime(new Path(file.toString()), tableId.toString());
+          stat = Status.newBuilder(stat).setCreatedTime(createdTime).build();
+          value = ProtobufUtil.toValue(stat);
+          log.debug("Status was lacking createdTime, set to {} for {}", createdTime, file);
+        } catch (IOException e) {
+          log.warn("Failed to get file status, will retry", e);
+          return false;
+        } catch (MutationsRejectedException e) {
+          log.warn("Failed to write status mutation for replication, will retry", e);
+          return false;
+        }
       }
 
       log.info("Creating order record for {} for {} with {}", file, tableId, ProtobufUtil.toString(stat));
@@ -254,4 +271,21 @@ public class StatusMaker {
       log.warn("Failed to delete status mutations for metadata table, will retry", e);
     }
   }
+
+  private long setAndGetCreatedTime(Path file, String tableId) throws IOException, MutationsRejectedException {
+    long createdTime;
+    if (fs.exists(file)) {
+      createdTime = fs.getFileStatus(file).getModificationTime();
+    } else {
+      createdTime = System.currentTimeMillis();
+    }
+
+    Status status = Status.newBuilder().setCreatedTime(createdTime).build();
+    Mutation m = new Mutation(new Text(ReplicationSection.getRowPrefix() + file.toString()));
+    m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId), ProtobufUtil.toValue(status));
+    replicationWriter.addMutation(m);
+    replicationWriter.flush();
+
+    return createdTime;
+  }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
index ad8d3a9..6a53cec 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.test.replication;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -41,11 +42,15 @@ import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.master.replication.StatusMaker;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -56,6 +61,7 @@ import com.google.common.collect.Sets;
 public class StatusMakerIT extends ConfigurableMacBase {
 
   private Connector conn;
+  private VolumeManager fs;
 
   @Before
   public void setupInstance() throws Exception {
@@ -63,6 +69,7 @@ public class StatusMakerIT extends ConfigurableMacBase {
     ReplicationTable.setOnline(conn);
     conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
     conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+    fs = EasyMock.mock(VolumeManager.class);
   }
 
   @Test
@@ -92,7 +99,7 @@ public class StatusMakerIT extends ConfigurableMacBase {
 
     bw.close();
 
-    StatusMaker statusMaker = new StatusMaker(conn);
+    StatusMaker statusMaker = new StatusMaker(conn, fs);
     statusMaker.setSourceTableName(sourceTable);
 
     statusMaker.run();
@@ -137,7 +144,7 @@ public class StatusMakerIT extends ConfigurableMacBase {
 
     bw.close();
 
-    StatusMaker statusMaker = new StatusMaker(conn);
+    StatusMaker statusMaker = new StatusMaker(conn, fs);
     statusMaker.setSourceTableName(sourceTable);
 
     statusMaker.run();
@@ -173,7 +180,7 @@ public class StatusMakerIT extends ConfigurableMacBase {
 
     bw.close();
 
-    StatusMaker statusMaker = new StatusMaker(conn);
+    StatusMaker statusMaker = new StatusMaker(conn, fs);
     statusMaker.setSourceTableName(sourceTable);
 
     statusMaker.run();
@@ -218,7 +225,7 @@ public class StatusMakerIT extends ConfigurableMacBase {
 
     bw.close();
 
-    StatusMaker statusMaker = new StatusMaker(conn);
+    StatusMaker statusMaker = new StatusMaker(conn, fs);
     statusMaker.setSourceTableName(sourceTable);
 
     statusMaker.run();
@@ -247,4 +254,97 @@ public class StatusMakerIT extends ConfigurableMacBase {
     Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
     Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
   }
+
+  @Test
+  public void orderRecordsCreatedWithNoCreatedTime() throws Exception {
+    String sourceTable = testName.getMethodName();
+    conn.tableOperations().create(sourceTable);
+    ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
+
+    BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
+    String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
+    List<String> files = Arrays.asList(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
+        walPrefix + UUID.randomUUID());
+    Map<String,Long> fileToTableId = new HashMap<>();
+
+    Status.Builder statBuilder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true);
+
+    Map<String,Long> statuses = new HashMap<>();
+    long index = 1;
+    for (String file : files) {
+      Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+      m.put(ReplicationSection.COLF, new Text(Long.toString(index)), ProtobufUtil.toValue(statBuilder.build()));
+      bw.addMutation(m);
+      fileToTableId.put(file, index);
+
+      FileStatus status = EasyMock.mock(FileStatus.class);
+      EasyMock.expect(status.getModificationTime()).andReturn(index);
+      EasyMock.replay(status);
+      statuses.put(file, index);
+
+      EasyMock.expect(fs.exists(new Path(file))).andReturn(true);
+      EasyMock.expect(fs.getFileStatus(new Path(file))).andReturn(status);
+
+      index++;
+    }
+
+    EasyMock.replay(fs);
+
+    bw.close();
+
+    StatusMaker statusMaker = new StatusMaker(conn, fs);
+    statusMaker.setSourceTableName(sourceTable);
+
+    statusMaker.run();
+
+    Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+    s.fetchColumnFamily(ReplicationSection.COLF);
+    Assert.assertEquals(0, Iterables.size(s));
+
+    s = ReplicationTable.getScanner(conn);
+    OrderSection.limit(s);
+    Iterator<Entry<Key,Value>> iter = s.iterator();
+    Assert.assertTrue("Found no order records in replication table", iter.hasNext());
+
+    Iterator<String> expectedFiles = files.iterator();
+    Text buff = new Text();
+    while (expectedFiles.hasNext() && iter.hasNext()) {
+      String file = expectedFiles.next();
+      Entry<Key,Value> entry = iter.next();
+
+      Assert.assertEquals(file, OrderSection.getFile(entry.getKey(), buff));
+      OrderSection.getTableId(entry.getKey(), buff);
+      Assert.assertEquals(fileToTableId.get(file).intValue(), Integer.parseInt(buff.toString()));
+      Status status = Status.parseFrom(entry.getValue().get());
+      Assert.assertTrue(status.hasCreatedTime());
+      Assert.assertEquals((long) statuses.get(file), status.getCreatedTime());
+    }
+
+    Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
+    Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
+
+    s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+    s.fetchColumnFamily(ReplicationSection.COLF);
+    Assert.assertEquals(0, Iterables.size(s));
+
+    s = ReplicationTable.getScanner(conn);
+    s.setRange(ReplicationSection.getRange());
+    iter = s.iterator();
+    Assert.assertTrue("Found no stat records in replication table", iter.hasNext());
+
+    Collections.sort(files);
+    expectedFiles = files.iterator();
+    while (expectedFiles.hasNext() && iter.hasNext()) {
+      String file = expectedFiles.next();
+      Entry<Key,Value> entry = iter.next();
+      Status status = Status.parseFrom(entry.getValue().get());
+      Assert.assertTrue(status.hasCreatedTime());
+      Assert.assertEquals((long) statuses.get(file), status.getCreatedTime());
+    }
+
+    Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
+    Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@accumulo.apache.org" <co...@accumulo.apache.org>'].