You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/12 20:31:52 UTC
svn commit: r694770 - in
/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store:
KahaDBPersistenceAdaptor.java MessageDatabase.java
Author: chirino
Date: Fri Sep 12 11:31:51 2008
New Revision: 694770
URL: http://svn.apache.org/viewvc?rev=694770&view=rev
Log:
Added a new index to order messages by a sequence id instead of a the Location.
The problem was that messages in a transaction have a location id the comes earlier
than messages not in a transaction, but those transacted messages should be recovered
after those messages.
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=694770&r1=694769&r2=694770&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java Fri Sep 12 11:31:51 2008
@@ -50,6 +50,7 @@
import org.apache.activemq.wireformat.WireFormat;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.store.MessageDatabase.MessageKeys;
import org.apache.kahadb.store.data.KahaAddMessageCommand;
import org.apache.kahadb.store.data.KahaCommitCommand;
import org.apache.kahadb.store.data.KahaDestination;
@@ -168,7 +169,11 @@
location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>(){
public Location execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
- return sd.messageIdIndex.get(tx, key);
+ Long sequence = sd.messageIdIndex.get(tx, key);
+ if( sequence ==null ) {
+ return null;
+ }
+ return sd.orderIndex.get(tx, sequence).location;
}
});
}
@@ -186,7 +191,7 @@
// Iterate through all index entries to get a count of messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
int rc=0;
- for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+ for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
iterator.next();
rc++;
}
@@ -201,37 +206,34 @@
pageFile.tx().execute(new Transaction.Closure<Exception>(){
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
- for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
- Entry<Location, String> entry = iterator.next();
- listener.recoverMessage( loadMessage(entry.getKey() ) );
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+ Entry<Long, MessageKeys> entry = iterator.next();
+ listener.recoverMessage( loadMessage(entry.getValue().location) );
}
}
});
}
}
- Location cursorPos=null;
+ long cursorPos=0;
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
- Entry<Location, String> entry=null;
+ Entry<Long, MessageKeys> entry=null;
int counter = 0;
- for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
entry = iterator.next();
- listener.recoverMessage( loadMessage(entry.getKey() ) );
+ listener.recoverMessage( loadMessage(entry.getValue().location ) );
counter++;
if( counter >= maxReturned ) {
break;
}
}
if( entry!=null ) {
- // Copy the location, cause the iterator gives us the key by reference.. changing it
- // would mess up the index.
- cursorPos = new Location(entry.getKey());
- cursorPos.setOffset(entry.getKey().getOffset()+1 );
+ cursorPos = entry.getKey()+1;
}
}
});
@@ -239,7 +241,7 @@
}
public void resetBatching() {
- cursorPos=null;
+ cursorPos=0;
}
public void setMemoryUsage(MemoryUsage memoeyUSage) {
@@ -330,16 +332,15 @@
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
public Integer execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
- Location cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+ Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
if ( cursorPos==null ) {
// The subscription might not exist.
return 0;
}
- cursorPos = new Location(cursorPos);
- cursorPos.setOffset(cursorPos.getOffset()+1 );
+ cursorPos += 1;
int counter = 0;
- for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
iterator.next();
counter++;
}
@@ -355,12 +356,12 @@
pageFile.tx().execute(new Transaction.Closure<Exception>(){
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
- Location cursorPos = new Location(sd.subscriptionAcks.get(tx, subscriptionKey));
- cursorPos.setOffset(cursorPos.getOffset()+1 );
+ Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+ cursorPos += 1;
- for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
- Entry<Location, String> entry = iterator.next();
- listener.recoverMessage( loadMessage(entry.getKey() ) );
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+ Entry<Long, MessageKeys> entry = iterator.next();
+ listener.recoverMessage( loadMessage(entry.getValue().location ) );
}
}
});
@@ -373,28 +374,24 @@
pageFile.tx().execute(new Transaction.Closure<Exception>(){
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
- Location cursorPos = sd.subscriptionCursors.get(subscriptionKey);
+ Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
if( cursorPos == null ) {
- cursorPos = new Location(sd.subscriptionAcks.get(tx, subscriptionKey));
- cursorPos.setOffset(cursorPos.getOffset()+1 );
+ cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+ cursorPos += 1;
}
- Entry<Location, String> entry=null;
+ Entry<Long, MessageKeys> entry=null;
int counter = 0;
- for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
entry = iterator.next();
- listener.recoverMessage( loadMessage(entry.getKey() ) );
+ listener.recoverMessage( loadMessage(entry.getValue().location ) );
counter++;
if( counter >= maxReturned ) {
break;
}
}
if( entry!=null ) {
- // Copy the location, cause the iterator gives us the key by reference.. changing it
- // would mess up the index.
- cursorPos = new Location(entry.getKey());
- cursorPos.setOffset(entry.getKey().getOffset()+1 );
- sd.subscriptionCursors.put(subscriptionKey, cursorPos);
+ sd.subscriptionCursors.put(subscriptionKey, cursorPos+1);
}
}
});
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=694770&r1=694769&r2=694770&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Fri Sep 12 11:31:51 2008
@@ -41,6 +41,7 @@
import org.apache.activemq.command.XATransactionId;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.LongMarshaller;
import org.apache.kahadb.Marshaller;
import org.apache.kahadb.StringMarshaller;
import org.apache.kahadb.index.BTreeIndex;
@@ -572,38 +573,43 @@
// Skip adding the message to the index if this is a topic and there are
// no subscriptions.
- if (sd.subscriptions != null && sd.ackLocations.isEmpty()) {
+ if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
return;
}
// Add the message.
- sd.orderIndex.put(tx, location, command.getMessageId());
- sd.messageIdIndex.put(tx, command.getMessageId(), location);
+ long id = sd.nextMessageId++;
+ sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+ sd.locationIndex.put(tx, location, id);
+ sd.messageIdIndex.put(tx, command.getMessageId(), id);
}
private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
if (!command.hasSubscriptionKey()) {
// In the queue case we just remove the message from the index..
- Location messageLocation = sd.messageIdIndex.remove(tx, command.getMessageId());
- if (messageLocation != null) {
- sd.orderIndex.remove(tx, messageLocation);
+ Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
+ if (sequenceId != null) {
+ MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+ if( keys!=null ) {
+ sd.locationIndex.remove(tx, keys.location);
+ }
}
} else {
// In the topic case we need remove the message once it's been acked
// by all the subs
- Location messageLocation = sd.messageIdIndex.get(tx, command.getMessageId());
+ Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
// Make sure it's a valid message id...
- if (messageLocation != null) {
+ if (sequence != null) {
String subscriptionKey = command.getSubscriptionKey();
- Location prev = sd.subscriptionAcks.put(tx, subscriptionKey, messageLocation);
+ Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
// The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, prev);
// Add it to the new location set.
- addAckLocation(sd, messageLocation, subscriptionKey);
+ addAckLocation(sd, sequence, subscriptionKey);
}
}
@@ -612,18 +618,24 @@
private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
sd.orderIndex.clear(tx);
- sd.messageIdIndex.clear(tx);
sd.orderIndex.unload(tx);
- sd.messageIdIndex.unload(tx);
tx.free(sd.orderIndex.getPageId());
+
+ sd.locationIndex.clear(tx);
+ sd.locationIndex.unload(tx);
+ tx.free(sd.locationIndex.getPageId());
+
+ sd.messageIdIndex.clear(tx);
+ sd.messageIdIndex.unload(tx);
tx.free(sd.messageIdIndex.getPageId());
if (sd.subscriptions != null) {
sd.subscriptions.clear(tx);
- sd.subscriptionAcks.clear(tx);
sd.subscriptions.unload(tx);
- sd.subscriptionAcks.unload(tx);
tx.free(sd.subscriptions.getPageId());
+
+ sd.subscriptionAcks.clear(tx);
+ sd.subscriptionAcks.unload(tx);
tx.free(sd.subscriptionAcks.getPageId());
}
@@ -639,11 +651,9 @@
if (command.hasSubscriptionInfo()) {
String subscriptionKey = command.getSubscriptionKey();
sd.subscriptions.put(tx, subscriptionKey, command);
- Location ackLocation;
- if (command.getRetroactive()) {
- ackLocation = new Location(0, 0);
- } else {
- ackLocation = location;
+ long ackLocation=-1;
+ if (!command.getRetroactive()) {
+ ackLocation = sd.nextMessageId-1;
}
sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
@@ -652,8 +662,10 @@
// delete the sub...
String subscriptionKey = command.getSubscriptionKey();
sd.subscriptions.remove(tx, subscriptionKey);
- Location prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
- removeAckLocation(tx, sd, subscriptionKey, prev);
+ Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
+ if( prev!=null ) {
+ removeAckLocation(tx, sd, subscriptionKey, prev);
+ }
}
}
@@ -672,7 +684,7 @@
for (StoredDestination sd : storedDestinations.values()) {
// Use a visitor to cut down the number of pages that we load
- sd.orderIndex.visit(tx, new BTreeVisitor<Location, String>() {
+ sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
int last=-1;
public boolean isInterestedInKeysBetween(Location first, Location second) {
if( second!=null ) {
@@ -687,7 +699,7 @@
return true;
}
- public void visit(List<Location> keys, List<String> values) {
+ public void visit(List<Location> keys, List<Long> values) {
for (int i = 0; i < keys.size(); i++) {
if( last != keys.get(i).getDataFileId() ) {
inUseFiles.add(keys.get(i).getDataFileId());
@@ -730,16 +742,45 @@
Location lastAckLocation;
Location cursor;
}
+
+ static class MessageKeys {
+ final String messageId;
+ final Location location;
+
+ public MessageKeys(String messageId, Location location) {
+ this.messageId=messageId;
+ this.location=location;
+ }
+ }
+
+ static protected class MessageKeysMarshaller implements Marshaller<MessageKeys> {
+ static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
+
+ public Class<MessageKeys> getType() {
+ return MessageKeys.class;
+ }
+ public MessageKeys readPayload(DataInput dataIn) throws IOException {
+ return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
+ }
+
+ public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
+ dataOut.writeUTF(object.messageId);
+ LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
+ }
+ }
+
static class StoredDestination {
- BTreeIndex<Location, String> orderIndex;
- BTreeIndex<String, Location> messageIdIndex;
+ long nextMessageId;
+ BTreeIndex<Long, MessageKeys> orderIndex;
+ BTreeIndex<Location, Long> locationIndex;
+ BTreeIndex<String, Long> messageIdIndex;
// These bits are only set for Topics
BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
- BTreeIndex<String, Location> subscriptionAcks;
- HashMap<String, Location> subscriptionCursors;
- TreeMap<Location, HashSet<String>> ackLocations;
+ BTreeIndex<String, Long> subscriptionAcks;
+ HashMap<String, Long> subscriptionCursors;
+ TreeMap<Long, HashSet<String>> ackPositions;
}
protected class StoredDestinationMarshaller implements Marshaller<StoredDestination> {
@@ -749,18 +790,20 @@
public StoredDestination readPayload(DataInput dataIn) throws IOException {
StoredDestination value = new StoredDestination();
- value.orderIndex = new BTreeIndex<Location, String>(pageFile, dataIn.readLong());
- value.messageIdIndex = new BTreeIndex<String, Location>(pageFile, dataIn.readLong());
+ value.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+ value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
+ value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
if (dataIn.readBoolean()) {
value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
- value.subscriptionAcks = new BTreeIndex<String, Location>(pageFile, dataIn.readLong());
+ value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
}
return value;
}
public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
dataOut.writeLong(value.orderIndex.getPageId());
+ dataOut.writeLong(value.locationIndex.getPageId());
dataOut.writeLong(value.messageIdIndex.getPageId());
if (value.subscriptions != null) {
dataOut.writeBoolean(true);
@@ -841,25 +884,36 @@
if (rc == null) {
// Brand new destination.. allocate indexes for it.
rc = new StoredDestination();
- rc.orderIndex = new BTreeIndex<Location, String>(pageFile, tx.allocate());
- rc.messageIdIndex = new BTreeIndex<String, Location>(pageFile, tx.allocate());
+ rc.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+ rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
+ rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
if (topic) {
rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
- rc.subscriptionAcks = new BTreeIndex<String, Location>(pageFile, tx.allocate());
+ rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
}
metadata.destinations.put(tx, key, rc);
}
// Configure the marshalers and load.
- rc.orderIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
- rc.orderIndex.setValueMarshaller(StringMarshaller.INSTANCE);
+ rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+ rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
rc.orderIndex.load(tx);
+ // Figure out the next key using the last entry in the destination.
+ Entry<Long, MessageKeys> lastEntry = rc.orderIndex.getLast(tx);
+ if( lastEntry!=null ) {
+ rc.nextMessageId = lastEntry.getKey()+1;
+ }
+
+ rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
+ rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+ rc.locationIndex.load(tx);
+
rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
- rc.messageIdIndex.setValueMarshaller(LocationMarshaller.INSTANCE);
+ rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
rc.messageIdIndex.load(tx);
-
+
// If it was a topic...
if (topic) {
@@ -868,14 +922,14 @@
rc.subscriptions.load(tx);
rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
- rc.subscriptionAcks.setValueMarshaller(LocationMarshaller.INSTANCE);
+ rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
rc.subscriptionAcks.load(tx);
- rc.ackLocations = new TreeMap<Location, HashSet<String>>();
- rc.subscriptionCursors = new HashMap<String, Location>();
+ rc.ackPositions = new TreeMap<Long, HashSet<String>>();
+ rc.subscriptionCursors = new HashMap<String, Long>();
- for (Iterator<Entry<String, Location>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
- Entry<String, Location> entry = iterator.next();
+ for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
+ Entry<String, Long> entry = iterator.next();
addAckLocation(rc, entry.getValue(), entry.getKey());
}
@@ -885,14 +939,14 @@
/**
* @param sd
- * @param messageLocation
+ * @param messageSequence
* @param subscriptionKey
*/
- private void addAckLocation(StoredDestination sd, Location messageLocation, String subscriptionKey) {
- HashSet<String> hs = sd.ackLocations.get(messageLocation);
+ private void addAckLocation(StoredDestination sd, Long messageSequence, String subscriptionKey) {
+ HashSet<String> hs = sd.ackPositions.get(messageSequence);
if (hs == null) {
hs = new HashSet<String>();
- sd.ackLocations.put(messageLocation, hs);
+ sd.ackPositions.put(messageSequence, hs);
}
hs.add(subscriptionKey);
}
@@ -901,18 +955,18 @@
* @param tx
* @param sd
* @param subscriptionKey
- * @param location
+ * @param sequenceId
* @throws IOException
*/
- private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Location location) throws IOException {
+ private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
// Remove the sub from the previous location set..
- if (location != null) {
- HashSet<String> hs = sd.ackLocations.get(location);
+ if (sequenceId != null) {
+ HashSet<String> hs = sd.ackPositions.get(sequenceId);
if (hs != null) {
hs.remove(subscriptionKey);
if (hs.isEmpty()) {
- HashSet<String> firstSet = sd.ackLocations.values().iterator().next();
- sd.ackLocations.remove(location);
+ HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
+ sd.ackPositions.remove(sequenceId);
// Did we just empty out the first set in the
// ordered list of ack locations? Then it's time to
@@ -920,10 +974,10 @@
if (hs == firstSet) {
// Find all the entries that need to get deleted.
- ArrayList<Entry<Location, String>> deletes = new ArrayList<Entry<Location, String>>();
- for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
- Entry<Location, String> entry = iterator.next();
- while (entry.getKey().compareTo(location) <= 0) {
+ ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+ Entry<Long, MessageKeys> entry = iterator.next();
+ while (entry.getKey().compareTo(sequenceId) <= 0) {
// We don't do the actually delete while we are
// iterating the BTree since
// iterating would fail.
@@ -932,11 +986,11 @@
}
// Do the actual deletes.
- for (Entry<Location, String> entry : deletes) {
- sd.messageIdIndex.remove(tx, entry.getValue());
- sd.orderIndex.remove(tx, entry.getKey());
+ for (Entry<Long, MessageKeys> entry : deletes) {
+ sd.locationIndex.remove(tx, entry.getValue().location);
+ sd.messageIdIndex.remove(tx,entry.getValue().messageId);
+ sd.orderIndex.remove(tx,entry.getKey());
}
-
}
}
}