You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2017/12/31 02:07:41 UTC

[GitHub] asfgit closed pull request #339: [ACCUMULO-4751] Set createdTime for status records

asfgit closed pull request #339: [ACCUMULO-4751] Set createdTime for status records
URL: https://github.com/apache/accumulo/pull/339
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
index 72b3bbd66e..37d168eafc 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
@@ -72,7 +72,7 @@ public void run() {
           continue;
         }
 
-        statusMaker = new StatusMaker(conn);
+        statusMaker = new StatusMaker(conn, master.getFileSystem());
         workMaker = new WorkMaker(master, conn);
         finishedWorkUpdater = new FinishedWorkUpdater(conn);
         rcrr = new RemoveCompleteReplicationRecords(conn);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
index 4a0ed521e5..75a9a297bc 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.master.replication;
 
+import java.io.IOException;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -40,7 +41,9 @@
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,12 +58,14 @@
   private static final Logger log = LoggerFactory.getLogger(StatusMaker.class);
 
   private final Connector conn;
+  private final VolumeManager fs;
 
   private BatchWriter replicationWriter, metadataWriter;
   private String sourceTableName = MetadataTable.NAME;
 
-  public StatusMaker(Connector conn) {
+  public StatusMaker(Connector conn, VolumeManager fs) {
     this.conn = conn;
+    this.fs = fs;
   }
 
   /**
@@ -197,8 +202,20 @@ protected boolean addStatusRecord(Text file, String tableId, Value v) {
   protected boolean addOrderRecord(Text file, String tableId, Status stat, Value value) {
     try {
       if (!stat.hasCreatedTime()) {
-        log.error("Status record ({}) for {} in table {} was written to metadata table which lacked createdTime", ProtobufUtil.toString(stat), file, tableId);
-        return false;
+        try {
+          // If the createdTime is not set, work around the issue by retrieving the WAL creation time
+          // from HDFS (or the current time if the WAL does not exist).  See ACCUMULO-4751
+          long createdTime = setAndGetCreatedTime(new Path(file.toString()), tableId);
+          stat = Status.newBuilder(stat).setCreatedTime(createdTime).build();
+          value = ProtobufUtil.toValue(stat);
+          log.debug("Status was lacking createdTime, set to {} for {}", createdTime, file);
+        } catch (IOException e) {
+          log.warn("Failed to get file status, will retry", e);
+          return false;
+        } catch (MutationsRejectedException e) {
+          log.warn("Failed to write status mutation for replication, will retry", e);
+          return false;
+        }
       }
 
       log.info("Creating order record for {} for {} with {}", file, tableId, ProtobufUtil.toString(stat));
@@ -253,4 +270,21 @@ protected void deleteStatusRecord(Key k) {
       log.warn("Failed to delete status mutations for metadata table, will retry", e);
     }
   }
+
+  private long setAndGetCreatedTime(Path file, String tableId) throws IOException, MutationsRejectedException {
+    long createdTime;
+    if (fs.exists(file)) {
+      createdTime = fs.getFileStatus(file).getModificationTime();
+    } else {
+      createdTime = System.currentTimeMillis();
+    }
+
+    Status status = Status.newBuilder().setCreatedTime(createdTime).build();
+    Mutation m = new Mutation(new Text(ReplicationSection.getRowPrefix() + file.toString()));
+    m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId), ProtobufUtil.toValue(status));
+    replicationWriter.addMutation(m);
+    replicationWriter.flush();
+
+    return createdTime;
+  }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
index cd57ae1074..c7f8516936 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.test.replication;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -40,11 +41,15 @@
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.master.replication.StatusMaker;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,6 +60,7 @@
 public class StatusMakerIT extends ConfigurableMacBase {
 
   private Connector conn;
+  private VolumeManager fs;
 
   @Before
   public void setupInstance() throws Exception {
@@ -62,6 +68,7 @@ public void setupInstance() throws Exception {
     ReplicationTable.setOnline(conn);
     conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
     conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+    fs = EasyMock.mock(VolumeManager.class);
   }
 
   @Test
@@ -91,7 +98,7 @@ public void statusRecordsCreated() throws Exception {
 
     bw.close();
 
-    StatusMaker statusMaker = new StatusMaker(conn);
+    StatusMaker statusMaker = new StatusMaker(conn, fs);
     statusMaker.setSourceTableName(sourceTable);
 
     statusMaker.run();
@@ -136,7 +143,7 @@ public void openMessagesAreNotDeleted() throws Exception {
 
     bw.close();
 
-    StatusMaker statusMaker = new StatusMaker(conn);
+    StatusMaker statusMaker = new StatusMaker(conn, fs);
     statusMaker.setSourceTableName(sourceTable);
 
     statusMaker.run();
@@ -172,7 +179,7 @@ public void closedMessagesAreDeleted() throws Exception {
 
     bw.close();
 
-    StatusMaker statusMaker = new StatusMaker(conn);
+    StatusMaker statusMaker = new StatusMaker(conn, fs);
     statusMaker.setSourceTableName(sourceTable);
 
     statusMaker.run();
@@ -217,7 +224,7 @@ public void closedMessagesCreateOrderRecords() throws Exception {
 
     bw.close();
 
-    StatusMaker statusMaker = new StatusMaker(conn);
+    StatusMaker statusMaker = new StatusMaker(conn, fs);
     statusMaker.setSourceTableName(sourceTable);
 
     statusMaker.run();
@@ -246,4 +253,97 @@ public void closedMessagesCreateOrderRecords() throws Exception {
     Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
     Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
   }
+
+  @Test
+  public void orderRecordsCreatedWithNoCreatedTime() throws Exception {
+    String sourceTable = testName.getMethodName();
+    conn.tableOperations().create(sourceTable);
+    ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
+
+    BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
+    String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
+    List<String> files = Arrays.asList(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
+        walPrefix + UUID.randomUUID());
+    Map<String,Long> fileToTableId = new HashMap<>();
+
+    Status.Builder statBuilder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true);
+
+    Map<String,Long> statuses = new HashMap<>();
+    long index = 1;
+    for (String file : files) {
+      Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+      m.put(ReplicationSection.COLF, new Text(Long.toString(index)), ProtobufUtil.toValue(statBuilder.build()));
+      bw.addMutation(m);
+      fileToTableId.put(file, index);
+
+      FileStatus status = EasyMock.mock(FileStatus.class);
+      EasyMock.expect(status.getModificationTime()).andReturn(index);
+      EasyMock.replay(status);
+      statuses.put(file, index);
+
+      EasyMock.expect(fs.exists(new Path(file))).andReturn(true);
+      EasyMock.expect(fs.getFileStatus(new Path(file))).andReturn(status);
+
+      index++;
+    }
+
+    EasyMock.replay(fs);
+
+    bw.close();
+
+    StatusMaker statusMaker = new StatusMaker(conn, fs);
+    statusMaker.setSourceTableName(sourceTable);
+
+    statusMaker.run();
+
+    Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+    s.fetchColumnFamily(ReplicationSection.COLF);
+    Assert.assertEquals(0, Iterables.size(s));
+
+    s = ReplicationTable.getScanner(conn);
+    OrderSection.limit(s);
+    Iterator<Entry<Key,Value>> iter = s.iterator();
+    Assert.assertTrue("Found no order records in replication table", iter.hasNext());
+
+    Iterator<String> expectedFiles = files.iterator();
+    Text buff = new Text();
+    while (expectedFiles.hasNext() && iter.hasNext()) {
+      String file = expectedFiles.next();
+      Entry<Key,Value> entry = iter.next();
+
+      Assert.assertEquals(file, OrderSection.getFile(entry.getKey(), buff));
+      OrderSection.getTableId(entry.getKey(), buff);
+      Assert.assertEquals(fileToTableId.get(file).intValue(), Integer.parseInt(buff.toString()));
+      Status status = Status.parseFrom(entry.getValue().get());
+      Assert.assertTrue(status.hasCreatedTime());
+      Assert.assertEquals((long) statuses.get(file), status.getCreatedTime());
+    }
+
+    Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
+    Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
+
+    s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+    s.fetchColumnFamily(ReplicationSection.COLF);
+    Assert.assertEquals(0, Iterables.size(s));
+
+    s = ReplicationTable.getScanner(conn);
+    s.setRange(ReplicationSection.getRange());
+    iter = s.iterator();
+    Assert.assertTrue("Found no stat records in replication table", iter.hasNext());
+
+    Collections.sort(files);
+    expectedFiles = files.iterator();
+    while (expectedFiles.hasNext() && iter.hasNext()) {
+      String file = expectedFiles.next();
+      Entry<Key,Value> entry = iter.next();
+      Status status = Status.parseFrom(entry.getValue().get());
+      Assert.assertTrue(status.hasCreatedTime());
+      Assert.assertEquals((long) statuses.get(file), status.getCreatedTime());
+    }
+
+    Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
+    Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services