You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2014/11/24 23:37:55 UTC
hadoop git commit: YARN-1984. LeveldbTimelineStore does not handle db
exceptions properly. Contributed by Varun Saxena
Repository: hadoop
Updated Branches:
refs/heads/trunk 2967c17fe -> 1ce4d33c2
YARN-1984. LeveldbTimelineStore does not handle db exceptions properly. Contributed by Varun Saxena
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1ce4d33c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1ce4d33c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1ce4d33c
Branch: refs/heads/trunk
Commit: 1ce4d33c2dc86d711b227a04d2f9a2ab696a24a1
Parents: 2967c17
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Nov 24 22:36:59 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Nov 24 22:36:59 2014 +0000
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../server/timeline/LeveldbTimelineStore.java | 206 +++++++++++--------
.../timeline/TestLeveldbTimelineStore.java | 9 +-
3 files changed, 126 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ce4d33c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 6a29d19..5d1e107 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -135,6 +135,9 @@ Release 2.7.0 - UNRELEASED
YARN-2315. FairScheduler: Set current capacity in addition to capacity.
(Zhihai Xu via kasha)
+ YARN-1984. LeveldbTimelineStore does not handle db exceptions properly
+ (Varun Saxena via jlowe)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ce4d33c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
index c4ea996..33deb80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
@@ -66,10 +66,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
-import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.ReadOptions;
import org.iq80.leveldb.WriteBatch;
@@ -438,13 +438,15 @@ public class LeveldbTimelineStore extends AbstractService
.add(entityType).add(writeReverseOrderedLong(revStartTime))
.add(entityId).getBytesForLookup();
- DBIterator iterator = null;
+ LeveldbIterator iterator = null;
try {
- iterator = db.iterator();
+ iterator = new LeveldbIterator(db);
iterator.seek(prefix);
return getEntity(entityId, entityType, revStartTime, fields, iterator,
prefix, prefix.length);
+ } catch(DBException e) {
+ throw new IOException(e);
} finally {
IOUtils.cleanup(LOG, iterator);
}
@@ -455,7 +457,7 @@ public class LeveldbTimelineStore extends AbstractService
* specified fields for this entity, return null.
*/
private static TimelineEntity getEntity(String entityId, String entityType,
- Long startTime, EnumSet<Field> fields, DBIterator iterator,
+ Long startTime, EnumSet<Field> fields, LeveldbIterator iterator,
byte[] prefix, int prefixlen) throws IOException {
if (fields == null) {
fields = EnumSet.allOf(Field.class);
@@ -562,7 +564,7 @@ public class LeveldbTimelineStore extends AbstractService
o2.length);
}
});
- DBIterator iterator = null;
+ LeveldbIterator iterator = null;
try {
// look up start times for the specified entities
// skip entities with no start time
@@ -606,7 +608,7 @@ public class LeveldbTimelineStore extends AbstractService
if (limit == null) {
limit = DEFAULT_LIMIT;
}
- iterator = db.iterator();
+ iterator = new LeveldbIterator(db);
for (iterator.seek(first); entity.getEvents().size() < limit &&
iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
@@ -623,6 +625,8 @@ public class LeveldbTimelineStore extends AbstractService
}
}
}
+ } catch(DBException e) {
+ throw new IOException(e);
} finally {
IOUtils.cleanup(LOG, iterator);
}
@@ -683,7 +687,7 @@ public class LeveldbTimelineStore extends AbstractService
String entityType, Long limit, Long starttime, Long endtime,
String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields) throws IOException {
- DBIterator iterator = null;
+ LeveldbIterator iterator = null;
try {
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
// only db keys matching the prefix (base + entity type) will be parsed
@@ -724,7 +728,7 @@ public class LeveldbTimelineStore extends AbstractService
}
TimelineEntities entities = new TimelineEntities();
- iterator = db.iterator();
+ iterator = new LeveldbIterator(db);
iterator.seek(first);
// iterate until one of the following conditions is met: limit is
// reached, there are no more keys, the key prefix no longer matches,
@@ -783,10 +787,23 @@ public class LeveldbTimelineStore extends AbstractService
}
}
return entities;
+ } catch(DBException e) {
+ throw new IOException(e);
} finally {
IOUtils.cleanup(LOG, iterator);
}
}
+
+ /**
+ * Handle error and set it in response.
+ */
+ private static void handleError(TimelineEntity entity, TimelinePutResponse response, final int errorCode) {
+ TimelinePutError error = new TimelinePutError();
+ error.setEntityId(entity.getEntityId());
+ error.setEntityType(entity.getEntityType());
+ error.setErrorCode(errorCode);
+ response.addError(error);
+ }
/**
* Put a single entity. If there is an error, add a TimelinePutError to the
@@ -812,11 +829,7 @@ public class LeveldbTimelineStore extends AbstractService
entity.getStartTime(), events);
if (startAndInsertTime == null) {
// if no start time is found, add an error and return
- TimelinePutError error = new TimelinePutError();
- error.setEntityId(entity.getEntityId());
- error.setEntityType(entity.getEntityType());
- error.setErrorCode(TimelinePutError.NO_START_TIME);
- response.addError(error);
+ handleError(entity, response, TimelinePutError.NO_START_TIME);
return;
}
revStartTime = writeReverseOrderedLong(startAndInsertTime
@@ -883,11 +896,7 @@ public class LeveldbTimelineStore extends AbstractService
if (!domainId.equals(entity.getDomainId())) {
// in this case the entity will be put, but the relation will be
// ignored
- TimelinePutError error = new TimelinePutError();
- error.setEntityId(entity.getEntityId());
- error.setEntityType(entity.getEntityType());
- error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
- response.addError(error);
+ handleError(entity, response, TimelinePutError.FORBIDDEN_RELATION);
continue;
}
}
@@ -933,11 +942,7 @@ public class LeveldbTimelineStore extends AbstractService
if (entity.getDomainId() == null ||
entity.getDomainId().length() == 0) {
if (!allowEmptyDomainId) {
- TimelinePutError error = new TimelinePutError();
- error.setEntityId(entity.getEntityId());
- error.setEntityType(entity.getEntityType());
- error.setErrorCode(TimelinePutError.NO_DOMAIN);
- response.addError(error);
+ handleError(entity, response, TimelinePutError.NO_DOMAIN);
return;
}
} else {
@@ -946,14 +951,14 @@ public class LeveldbTimelineStore extends AbstractService
entity.getDomainId().getBytes());
}
db.write(writeBatch);
+ } catch (DBException de) {
+ LOG.error("Error putting entity " + entity.getEntityId() +
+ " of type " + entity.getEntityType(), de);
+ handleError(entity, response, TimelinePutError.IO_EXCEPTION);
} catch (IOException e) {
LOG.error("Error putting entity " + entity.getEntityId() +
" of type " + entity.getEntityType(), e);
- TimelinePutError error = new TimelinePutError();
- error.setEntityId(entity.getEntityId());
- error.setEntityType(entity.getEntityType());
- error.setErrorCode(TimelinePutError.IO_EXCEPTION);
- response.addError(error);
+ handleError(entity, response, TimelinePutError.IO_EXCEPTION);
} finally {
lock.unlock();
writeLocks.returnLock(lock);
@@ -983,15 +988,16 @@ public class LeveldbTimelineStore extends AbstractService
relatedEntity.getType(), relatedEntityStartTime),
writeReverseOrderedLong(relatedEntityStartAndInsertTime
.insertTime));
+ } catch (DBException de) {
+ LOG.error("Error putting related entity " + relatedEntity.getId() +
+ " of type " + relatedEntity.getType() + " for entity " +
+ entity.getEntityId() + " of type " + entity.getEntityType(), de);
+ handleError(entity, response, TimelinePutError.IO_EXCEPTION);
} catch (IOException e) {
LOG.error("Error putting related entity " + relatedEntity.getId() +
" of type " + relatedEntity.getType() + " for entity " +
entity.getEntityId() + " of type " + entity.getEntityType(), e);
- TimelinePutError error = new TimelinePutError();
- error.setEntityId(entity.getEntityId());
- error.setEntityType(entity.getEntityType());
- error.setErrorCode(TimelinePutError.IO_EXCEPTION);
- response.addError(error);
+ handleError(entity, response, TimelinePutError.IO_EXCEPTION);
} finally {
lock.unlock();
writeLocks.returnLock(lock);
@@ -1072,23 +1078,27 @@ public class LeveldbTimelineStore extends AbstractService
private Long getStartTimeLong(String entityId, String entityType)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
- // start time is not provided, so try to look it up
- if (startTimeReadCache.containsKey(entity)) {
- // found the start time in the cache
- return startTimeReadCache.get(entity);
- } else {
- // try to look up the start time in the db
- byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
- byte[] v = db.get(b);
- if (v == null) {
- // did not find the start time in the db
- return null;
+ try {
+ // start time is not provided, so try to look it up
+ if (startTimeReadCache.containsKey(entity)) {
+ // found the start time in the cache
+ return startTimeReadCache.get(entity);
} else {
- // found the start time in the db
- Long l = readReverseOrderedLong(v, 0);
- startTimeReadCache.put(entity, l);
- return l;
+ // try to look up the start time in the db
+ byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+ byte[] v = db.get(b);
+ if (v == null) {
+ // did not find the start time in the db
+ return null;
+ } else {
+ // found the start time in the db
+ Long l = readReverseOrderedLong(v, 0);
+ startTimeReadCache.put(entity, l);
+ return l;
+ }
}
+ } catch(DBException e) {
+ throw new IOException(e);
}
}
@@ -1152,28 +1162,32 @@ public class LeveldbTimelineStore extends AbstractService
StartAndInsertTime startAndInsertTime = null;
// create lookup key for start time
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
- // retrieve value for key
- byte[] v = db.get(b);
- if (v == null) {
- // start time doesn't exist in db
- if (suggestedStartTime == null) {
- return null;
+ try {
+ // retrieve value for key
+ byte[] v = db.get(b);
+ if (v == null) {
+ // start time doesn't exist in db
+ if (suggestedStartTime == null) {
+ return null;
+ }
+ startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
+ System.currentTimeMillis());
+
+ // write suggested start time
+ v = new byte[16];
+ writeReverseOrderedLong(suggestedStartTime, v, 0);
+ writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
+ WriteOptions writeOptions = new WriteOptions();
+ writeOptions.sync(true);
+ db.put(b, v, writeOptions);
+ } else {
+ // found start time in db, so ignore suggested start time
+ startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
+ readReverseOrderedLong(v, 8));
}
- startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
- System.currentTimeMillis());
-
- // write suggested start time
- v = new byte[16];
- writeReverseOrderedLong(suggestedStartTime, v, 0);
- writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
- WriteOptions writeOptions = new WriteOptions();
- writeOptions.sync(true);
- db.put(b, v, writeOptions);
- } else {
- // found start time in db, so ignore suggested start time
- startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
- readReverseOrderedLong(v, 8));
- }
+ } catch(DBException e) {
+ throw new IOException(e);
+ }
startTimeWriteCache.put(entity, startAndInsertTime);
startTimeReadCache.put(entity, startAndInsertTime.startTime);
return startAndInsertTime;
@@ -1373,7 +1387,7 @@ public class LeveldbTimelineStore extends AbstractService
@VisibleForTesting
List<String> getEntityTypes() throws IOException {
- DBIterator iterator = null;
+ LeveldbIterator iterator = null;
try {
iterator = getDbIterator(false);
List<String> entityTypes = new ArrayList<String>();
@@ -1396,6 +1410,8 @@ public class LeveldbTimelineStore extends AbstractService
iterator.seek(lookupKey);
}
return entityTypes;
+ } catch(DBException e) {
+ throw new IOException(e);
} finally {
IOUtils.cleanup(LOG, iterator);
}
@@ -1406,7 +1422,7 @@ public class LeveldbTimelineStore extends AbstractService
* the given write batch.
*/
private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix,
- DBIterator iterator) {
+ LeveldbIterator iterator) {
for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key)) {
@@ -1418,7 +1434,7 @@ public class LeveldbTimelineStore extends AbstractService
@VisibleForTesting
boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
- DBIterator iterator, DBIterator pfIterator, boolean seeked)
+ LeveldbIterator iterator, LeveldbIterator pfIterator, boolean seeked)
throws IOException {
WriteBatch writeBatch = null;
try {
@@ -1524,6 +1540,8 @@ public class LeveldbTimelineStore extends AbstractService
writeOptions.sync(true);
db.write(writeBatch, writeOptions);
return true;
+ } catch(DBException e) {
+ throw new IOException(e);
} finally {
IOUtils.cleanup(LOG, writeBatch);
}
@@ -1542,8 +1560,8 @@ public class LeveldbTimelineStore extends AbstractService
try {
List<String> entityTypes = getEntityTypes();
for (String entityType : entityTypes) {
- DBIterator iterator = null;
- DBIterator pfIterator = null;
+ LeveldbIterator iterator = null;
+ LeveldbIterator pfIterator = null;
long typeCount = 0;
try {
deleteLock.writeLock().lock();
@@ -1583,21 +1601,25 @@ public class LeveldbTimelineStore extends AbstractService
}
@VisibleForTesting
- DBIterator getDbIterator(boolean fillCache) {
+ LeveldbIterator getDbIterator(boolean fillCache) {
ReadOptions readOptions = new ReadOptions();
readOptions.fillCache(fillCache);
- return db.iterator(readOptions);
+ return new LeveldbIterator(db, readOptions);
}
Version loadVersion() throws IOException {
- byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
- // if version is not stored previously, treat it as 1.0.
- if (data == null || data.length == 0) {
- return Version.newInstance(1, 0);
+ try {
+ byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
+ // if version is not stored previously, treat it as 1.0.
+ if (data == null || data.length == 0) {
+ return Version.newInstance(1, 0);
+ }
+ Version version =
+ new VersionPBImpl(VersionProto.parseFrom(data));
+ return version;
+ } catch(DBException e) {
+ throw new IOException(e);
}
- Version version =
- new VersionPBImpl(VersionProto.parseFrom(data));
- return version;
}
// Only used for test
@@ -1726,6 +1748,8 @@ public class LeveldbTimelineStore extends AbstractService
writeBatch.put(domainEntryKey, timestamps);
writeBatch.put(ownerLookupEntryKey, timestamps);
db.write(writeBatch);
+ } catch(DBException e) {
+ throw new IOException(e);
} finally {
IOUtils.cleanup(LOG, writeBatch);
}
@@ -1754,13 +1778,15 @@ public class LeveldbTimelineStore extends AbstractService
@Override
public TimelineDomain getDomain(String domainId)
throws IOException {
- DBIterator iterator = null;
+ LeveldbIterator iterator = null;
try {
byte[] prefix = KeyBuilder.newInstance()
.add(DOMAIN_ENTRY_PREFIX).add(domainId).getBytesForLookup();
- iterator = db.iterator();
+ iterator = new LeveldbIterator(db);
iterator.seek(prefix);
return getTimelineDomain(iterator, domainId, prefix);
+ } catch(DBException e) {
+ throw new IOException(e);
} finally {
IOUtils.cleanup(LOG, iterator);
}
@@ -1769,12 +1795,12 @@ public class LeveldbTimelineStore extends AbstractService
@Override
public TimelineDomains getDomains(String owner)
throws IOException {
- DBIterator iterator = null;
+ LeveldbIterator iterator = null;
try {
byte[] prefix = KeyBuilder.newInstance()
.add(OWNER_LOOKUP_PREFIX).add(owner).getBytesForLookup();
List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
- for (iterator = db.iterator(), iterator.seek(prefix);
+ for (iterator = new LeveldbIterator(db), iterator.seek(prefix);
iterator.hasNext();) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key)) {
@@ -1809,13 +1835,15 @@ public class LeveldbTimelineStore extends AbstractService
TimelineDomains domainsToReturn = new TimelineDomains();
domainsToReturn.addDomains(domains);
return domainsToReturn;
+ } catch(DBException e) {
+ throw new IOException(e);
} finally {
IOUtils.cleanup(LOG, iterator);
}
}
private static TimelineDomain getTimelineDomain(
- DBIterator iterator, String domainId, byte[] prefix) throws IOException {
+ LeveldbIterator iterator, String domainId, byte[] prefix) throws IOException {
// Iterate over all the rows whose key starts with prefix to retrieve the
// domain information.
TimelineDomain domain = new TimelineDomain();
@@ -1852,5 +1880,5 @@ public class LeveldbTimelineStore extends AbstractService
} else {
return domain;
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ce4d33c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
index 5ebc96b..d266aa2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
@@ -45,7 +45,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
-import org.iq80.leveldb.DBIterator;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
+import org.iq80.leveldb.DBException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -146,13 +147,15 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
private boolean deleteNextEntity(String entityType, byte[] ts)
throws IOException, InterruptedException {
- DBIterator iterator = null;
- DBIterator pfIterator = null;
+ LeveldbIterator iterator = null;
+ LeveldbIterator pfIterator = null;
try {
iterator = ((LeveldbTimelineStore)store).getDbIterator(false);
pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false);
return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
iterator, pfIterator, false);
+ } catch(DBException e) {
+ throw new IOException(e);
} finally {
IOUtils.cleanup(null, iterator, pfIterator);
}