You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2017/12/31 02:07:32 UTC
[accumulo] branch master updated: ACCUMULO-4751 Set createdTime for
status records
This is an automated email from the ASF dual-hosted git repository.
elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 3161b98 ACCUMULO-4751 Set createdTime for status records
3161b98 is described below
commit 3161b98f05615844caf6980c8b5922375e92bd32
Author: Adam J. Shook <ad...@gmail.com>
AuthorDate: Thu Dec 14 09:36:59 2017 -0500
ACCUMULO-4751 Set createdTime for status records
Closes #339
---
.../master/replication/ReplicationDriver.java | 2 +-
.../accumulo/master/replication/StatusMaker.java | 40 +++++++-
.../accumulo/test/replication/StatusMakerIT.java | 108 ++++++++++++++++++++-
3 files changed, 142 insertions(+), 8 deletions(-)
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
index 48a9591..79bb917 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
@@ -72,7 +72,7 @@ public class ReplicationDriver extends Daemon {
continue;
}
- statusMaker = new StatusMaker(conn);
+ statusMaker = new StatusMaker(conn, master.getFileSystem());
workMaker = new WorkMaker(master, conn);
finishedWorkUpdater = new FinishedWorkUpdater(conn);
rcrr = new RemoveCompleteReplicationRecords(conn);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
index 7a41619..3b1e649 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.master.replication;
+import java.io.IOException;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.AccumuloException;
@@ -41,7 +42,9 @@ import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,12 +59,14 @@ public class StatusMaker {
private static final Logger log = LoggerFactory.getLogger(StatusMaker.class);
private final Connector conn;
+ private final VolumeManager fs;
private BatchWriter replicationWriter, metadataWriter;
private String sourceTableName = MetadataTable.NAME;
- public StatusMaker(Connector conn) {
+ public StatusMaker(Connector conn, VolumeManager fs) {
this.conn = conn;
+ this.fs = fs;
}
/**
@@ -198,8 +203,20 @@ public class StatusMaker {
protected boolean addOrderRecord(Text file, Table.ID tableId, Status stat, Value value) {
try {
if (!stat.hasCreatedTime()) {
- log.error("Status record ({}) for {} in table {} was written to metadata table which lacked createdTime", ProtobufUtil.toString(stat), file, tableId);
- return false;
+ try {
+ // If the createdTime is not set, work around the issue by retrieving the WAL creation time
+ // from HDFS (or the current time if the WAL does not exist). See ACCUMULO-4751
+ long createdTime = setAndGetCreatedTime(new Path(file.toString()), tableId.toString());
+ stat = Status.newBuilder(stat).setCreatedTime(createdTime).build();
+ value = ProtobufUtil.toValue(stat);
+ log.debug("Status was lacking createdTime, set to {} for {}", createdTime, file);
+ } catch (IOException e) {
+ log.warn("Failed to get file status, will retry", e);
+ return false;
+ } catch (MutationsRejectedException e) {
+ log.warn("Failed to write status mutation for replication, will retry", e);
+ return false;
+ }
}
log.info("Creating order record for {} for {} with {}", file, tableId, ProtobufUtil.toString(stat));
@@ -254,4 +271,21 @@ public class StatusMaker {
log.warn("Failed to delete status mutations for metadata table, will retry", e);
}
}
+
+ private long setAndGetCreatedTime(Path file, String tableId) throws IOException, MutationsRejectedException {
+ long createdTime;
+ if (fs.exists(file)) {
+ createdTime = fs.getFileStatus(file).getModificationTime();
+ } else {
+ createdTime = System.currentTimeMillis();
+ }
+
+ Status status = Status.newBuilder().setCreatedTime(createdTime).build();
+ Mutation m = new Mutation(new Text(ReplicationSection.getRowPrefix() + file.toString()));
+ m.put(MetadataSchema.ReplicationSection.COLF, new Text(tableId), ProtobufUtil.toValue(status));
+ replicationWriter.addMutation(m);
+ replicationWriter.flush();
+
+ return createdTime;
+ }
}
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
index ad8d3a9..6a53cec 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
@@ -17,6 +17,7 @@
package org.apache.accumulo.test.replication;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -41,11 +42,15 @@ import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.master.replication.StatusMaker;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.util.ReplicationTableUtil;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -56,6 +61,7 @@ import com.google.common.collect.Sets;
public class StatusMakerIT extends ConfigurableMacBase {
private Connector conn;
+ private VolumeManager fs;
@Before
public void setupInstance() throws Exception {
@@ -63,6 +69,7 @@ public class StatusMakerIT extends ConfigurableMacBase {
ReplicationTable.setOnline(conn);
conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+ fs = EasyMock.mock(VolumeManager.class);
}
@Test
@@ -92,7 +99,7 @@ public class StatusMakerIT extends ConfigurableMacBase {
bw.close();
- StatusMaker statusMaker = new StatusMaker(conn);
+ StatusMaker statusMaker = new StatusMaker(conn, fs);
statusMaker.setSourceTableName(sourceTable);
statusMaker.run();
@@ -137,7 +144,7 @@ public class StatusMakerIT extends ConfigurableMacBase {
bw.close();
- StatusMaker statusMaker = new StatusMaker(conn);
+ StatusMaker statusMaker = new StatusMaker(conn, fs);
statusMaker.setSourceTableName(sourceTable);
statusMaker.run();
@@ -173,7 +180,7 @@ public class StatusMakerIT extends ConfigurableMacBase {
bw.close();
- StatusMaker statusMaker = new StatusMaker(conn);
+ StatusMaker statusMaker = new StatusMaker(conn, fs);
statusMaker.setSourceTableName(sourceTable);
statusMaker.run();
@@ -218,7 +225,7 @@ public class StatusMakerIT extends ConfigurableMacBase {
bw.close();
- StatusMaker statusMaker = new StatusMaker(conn);
+ StatusMaker statusMaker = new StatusMaker(conn, fs);
statusMaker.setSourceTableName(sourceTable);
statusMaker.run();
@@ -247,4 +254,97 @@ public class StatusMakerIT extends ConfigurableMacBase {
Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
}
+
+ @Test
+ public void orderRecordsCreatedWithNoCreatedTime() throws Exception {
+ String sourceTable = testName.getMethodName();
+ conn.tableOperations().create(sourceTable);
+ ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
+
+ BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
+ String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
+ List<String> files = Arrays.asList(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
+ walPrefix + UUID.randomUUID());
+ Map<String,Long> fileToTableId = new HashMap<>();
+
+ Status.Builder statBuilder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true);
+
+ Map<String,Long> statuses = new HashMap<>();
+ long index = 1;
+ for (String file : files) {
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+ m.put(ReplicationSection.COLF, new Text(Long.toString(index)), ProtobufUtil.toValue(statBuilder.build()));
+ bw.addMutation(m);
+ fileToTableId.put(file, index);
+
+ FileStatus status = EasyMock.mock(FileStatus.class);
+ EasyMock.expect(status.getModificationTime()).andReturn(index);
+ EasyMock.replay(status);
+ statuses.put(file, index);
+
+ EasyMock.expect(fs.exists(new Path(file))).andReturn(true);
+ EasyMock.expect(fs.getFileStatus(new Path(file))).andReturn(status);
+
+ index++;
+ }
+
+ EasyMock.replay(fs);
+
+ bw.close();
+
+ StatusMaker statusMaker = new StatusMaker(conn, fs);
+ statusMaker.setSourceTableName(sourceTable);
+
+ statusMaker.run();
+
+ Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+ s.fetchColumnFamily(ReplicationSection.COLF);
+ Assert.assertEquals(0, Iterables.size(s));
+
+ s = ReplicationTable.getScanner(conn);
+ OrderSection.limit(s);
+ Iterator<Entry<Key,Value>> iter = s.iterator();
+ Assert.assertTrue("Found no order records in replication table", iter.hasNext());
+
+ Iterator<String> expectedFiles = files.iterator();
+ Text buff = new Text();
+ while (expectedFiles.hasNext() && iter.hasNext()) {
+ String file = expectedFiles.next();
+ Entry<Key,Value> entry = iter.next();
+
+ Assert.assertEquals(file, OrderSection.getFile(entry.getKey(), buff));
+ OrderSection.getTableId(entry.getKey(), buff);
+ Assert.assertEquals(fileToTableId.get(file).intValue(), Integer.parseInt(buff.toString()));
+ Status status = Status.parseFrom(entry.getValue().get());
+ Assert.assertTrue(status.hasCreatedTime());
+ Assert.assertEquals((long) statuses.get(file), status.getCreatedTime());
+ }
+
+ Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
+ Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
+
+ s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+ s.fetchColumnFamily(ReplicationSection.COLF);
+ Assert.assertEquals(0, Iterables.size(s));
+
+ s = ReplicationTable.getScanner(conn);
+ s.setRange(ReplicationSection.getRange());
+ iter = s.iterator();
+ Assert.assertTrue("Found no stat records in replication table", iter.hasNext());
+
+ Collections.sort(files);
+ expectedFiles = files.iterator();
+ while (expectedFiles.hasNext() && iter.hasNext()) {
+ String file = expectedFiles.next();
+ Entry<Key,Value> entry = iter.next();
+ Status status = Status.parseFrom(entry.getValue().get());
+ Assert.assertTrue(status.hasCreatedTime());
+ Assert.assertEquals((long) statuses.get(file), status.getCreatedTime());
+ }
+
+ Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
+ Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
+ }
}
--
To stop receiving notification emails like this one, please contact
['"commits@accumulo.apache.org" <co...@accumulo.apache.org>'].