You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2014/11/24 23:39:22 UTC

hadoop git commit: YARN-1984. LeveldbTimelineStore does not handle db exceptions properly. Contributed by Varun Saxena (cherry picked from commit 1ce4d33c2dc86d711b227a04d2f9a2ab696a24a1)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 69df08aad -> 89ef49fb0


YARN-1984. LeveldbTimelineStore does not handle db exceptions properly. Contributed by Varun Saxena
(cherry picked from commit 1ce4d33c2dc86d711b227a04d2f9a2ab696a24a1)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/89ef49fb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/89ef49fb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/89ef49fb

Branch: refs/heads/branch-2
Commit: 89ef49fb0814baea3640798fdf66d2ae3a550896
Parents: 69df08a
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Nov 24 22:36:59 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Nov 24 22:38:09 2014 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../server/timeline/LeveldbTimelineStore.java   | 206 +++++++++++--------
 .../timeline/TestLeveldbTimelineStore.java      |   9 +-
 3 files changed, 126 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/89ef49fb/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 12a79e8..139f9c5 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -108,6 +108,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2315. FairScheduler: Set current capacity in addition to capacity.
     (Zhihai Xu via kasha)
 
+    YARN-1984. LeveldbTimelineStore does not handle db exceptions properly
+    (Varun Saxena via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89ef49fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
index c4ea996..33deb80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
@@ -66,10 +66,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.fusesource.leveldbjni.JniDBFactory;
 import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBException;
-import org.iq80.leveldb.DBIterator;
 import org.iq80.leveldb.Options;
 import org.iq80.leveldb.ReadOptions;
 import org.iq80.leveldb.WriteBatch;
@@ -438,13 +438,15 @@ public class LeveldbTimelineStore extends AbstractService
         .add(entityType).add(writeReverseOrderedLong(revStartTime))
         .add(entityId).getBytesForLookup();
 
-    DBIterator iterator = null;
+    LeveldbIterator iterator = null;
     try {
-      iterator = db.iterator();
+      iterator = new LeveldbIterator(db);
       iterator.seek(prefix);
 
       return getEntity(entityId, entityType, revStartTime, fields, iterator,
           prefix, prefix.length);
+    } catch(DBException e) {
+      throw new IOException(e);            	
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
@@ -455,7 +457,7 @@ public class LeveldbTimelineStore extends AbstractService
    * specified fields for this entity, return null.
    */
   private static TimelineEntity getEntity(String entityId, String entityType,
-      Long startTime, EnumSet<Field> fields, DBIterator iterator,
+      Long startTime, EnumSet<Field> fields, LeveldbIterator iterator,
       byte[] prefix, int prefixlen) throws IOException {
     if (fields == null) {
       fields = EnumSet.allOf(Field.class);
@@ -562,7 +564,7 @@ public class LeveldbTimelineStore extends AbstractService
                 o2.length);
           }
         });
-    DBIterator iterator = null;
+    LeveldbIterator iterator = null;
     try {
       // look up start times for the specified entities
       // skip entities with no start time
@@ -606,7 +608,7 @@ public class LeveldbTimelineStore extends AbstractService
           if (limit == null) {
             limit = DEFAULT_LIMIT;
           }
-          iterator = db.iterator();
+          iterator = new LeveldbIterator(db);
           for (iterator.seek(first); entity.getEvents().size() < limit &&
               iterator.hasNext(); iterator.next()) {
             byte[] key = iterator.peekNext().getKey();
@@ -623,6 +625,8 @@ public class LeveldbTimelineStore extends AbstractService
           }
         }
       }
+    } catch(DBException e) {
+      throw new IOException(e);            	
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
@@ -683,7 +687,7 @@ public class LeveldbTimelineStore extends AbstractService
       String entityType, Long limit, Long starttime, Long endtime,
       String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
       EnumSet<Field> fields) throws IOException {
-    DBIterator iterator = null;
+    LeveldbIterator iterator = null;
     try {
       KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
       // only db keys matching the prefix (base + entity type) will be parsed
@@ -724,7 +728,7 @@ public class LeveldbTimelineStore extends AbstractService
       }
 
       TimelineEntities entities = new TimelineEntities();
-      iterator = db.iterator();
+      iterator = new LeveldbIterator(db);
       iterator.seek(first);
       // iterate until one of the following conditions is met: limit is
       // reached, there are no more keys, the key prefix no longer matches,
@@ -783,10 +787,23 @@ public class LeveldbTimelineStore extends AbstractService
         }
       }
       return entities;
+    } catch(DBException e) {
+      throw new IOException(e);   	
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
   }
+  
+  /**
+   * Handle error and set it in response.
+   */
+  private static void handleError(TimelineEntity entity, TimelinePutResponse response, final int errorCode) {
+    TimelinePutError error = new TimelinePutError();
+    error.setEntityId(entity.getEntityId());
+    error.setEntityType(entity.getEntityType());
+    error.setErrorCode(errorCode);
+    response.addError(error);
+  }
 
   /**
    * Put a single entity.  If there is an error, add a TimelinePutError to the
@@ -812,11 +829,7 @@ public class LeveldbTimelineStore extends AbstractService
           entity.getStartTime(), events);
       if (startAndInsertTime == null) {
         // if no start time is found, add an error and return
-        TimelinePutError error = new TimelinePutError();
-        error.setEntityId(entity.getEntityId());
-        error.setEntityType(entity.getEntityType());
-        error.setErrorCode(TimelinePutError.NO_START_TIME);
-        response.addError(error);
+        handleError(entity, response, TimelinePutError.NO_START_TIME);   
         return;
       }
       revStartTime = writeReverseOrderedLong(startAndInsertTime
@@ -883,11 +896,7 @@ public class LeveldbTimelineStore extends AbstractService
               if (!domainId.equals(entity.getDomainId())) {
                 // in this case the entity will be put, but the relation will be
                 // ignored
-                TimelinePutError error = new TimelinePutError();
-                error.setEntityId(entity.getEntityId());
-                error.setEntityType(entity.getEntityType());
-                error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
-                response.addError(error);
+                handleError(entity, response, TimelinePutError.FORBIDDEN_RELATION);
                 continue;
               }
             }
@@ -933,11 +942,7 @@ public class LeveldbTimelineStore extends AbstractService
       if (entity.getDomainId() == null ||
           entity.getDomainId().length() == 0) {
         if (!allowEmptyDomainId) {
-          TimelinePutError error = new TimelinePutError();
-          error.setEntityId(entity.getEntityId());
-          error.setEntityType(entity.getEntityType());
-          error.setErrorCode(TimelinePutError.NO_DOMAIN);
-          response.addError(error);
+          handleError(entity, response, TimelinePutError.NO_DOMAIN);
           return;
         }
       } else {
@@ -946,14 +951,14 @@ public class LeveldbTimelineStore extends AbstractService
             entity.getDomainId().getBytes());
       }
       db.write(writeBatch);
+    } catch (DBException de) {
+      LOG.error("Error putting entity " + entity.getEntityId() +
+                " of type " + entity.getEntityType(), de);
+      handleError(entity, response, TimelinePutError.IO_EXCEPTION);
     } catch (IOException e) {
       LOG.error("Error putting entity " + entity.getEntityId() +
           " of type " + entity.getEntityType(), e);
-      TimelinePutError error = new TimelinePutError();
-      error.setEntityId(entity.getEntityId());
-      error.setEntityType(entity.getEntityType());
-      error.setErrorCode(TimelinePutError.IO_EXCEPTION);
-      response.addError(error);
+      handleError(entity, response, TimelinePutError.IO_EXCEPTION);
     } finally {
       lock.unlock();
       writeLocks.returnLock(lock);
@@ -983,15 +988,16 @@ public class LeveldbTimelineStore extends AbstractService
             relatedEntity.getType(), relatedEntityStartTime),
             writeReverseOrderedLong(relatedEntityStartAndInsertTime
                 .insertTime));
+      } catch (DBException de) {
+        LOG.error("Error putting related entity " + relatedEntity.getId() +
+            " of type " + relatedEntity.getType() + " for entity " +
+            entity.getEntityId() + " of type " + entity.getEntityType(), de);
+        handleError(entity, response, TimelinePutError.IO_EXCEPTION);
       } catch (IOException e) {
         LOG.error("Error putting related entity " + relatedEntity.getId() +
             " of type " + relatedEntity.getType() + " for entity " +
             entity.getEntityId() + " of type " + entity.getEntityType(), e);
-        TimelinePutError error = new TimelinePutError();
-        error.setEntityId(entity.getEntityId());
-        error.setEntityType(entity.getEntityType());
-        error.setErrorCode(TimelinePutError.IO_EXCEPTION);
-        response.addError(error);
+        handleError(entity, response, TimelinePutError.IO_EXCEPTION);
       } finally {
         lock.unlock();
         writeLocks.returnLock(lock);
@@ -1072,23 +1078,27 @@ public class LeveldbTimelineStore extends AbstractService
   private Long getStartTimeLong(String entityId, String entityType)
       throws IOException {
     EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
-    // start time is not provided, so try to look it up
-    if (startTimeReadCache.containsKey(entity)) {
-      // found the start time in the cache
-      return startTimeReadCache.get(entity);
-    } else {
-      // try to look up the start time in the db
-      byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
-      byte[] v = db.get(b);
-      if (v == null) {
-        // did not find the start time in the db
-        return null;
+    try {
+      // start time is not provided, so try to look it up
+      if (startTimeReadCache.containsKey(entity)) {
+        // found the start time in the cache
+        return startTimeReadCache.get(entity);
       } else {
-        // found the start time in the db
-        Long l = readReverseOrderedLong(v, 0);
-        startTimeReadCache.put(entity, l);
-        return l;
+        // try to look up the start time in the db
+        byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+        byte[] v = db.get(b);
+        if (v == null) {
+          // did not find the start time in the db
+          return null;
+        } else {
+          // found the start time in the db
+          Long l = readReverseOrderedLong(v, 0);
+          startTimeReadCache.put(entity, l);
+          return l;
+        }
       }
+    } catch(DBException e) {
+      throw new IOException(e);   
     }
   }
 
@@ -1152,28 +1162,32 @@ public class LeveldbTimelineStore extends AbstractService
     StartAndInsertTime startAndInsertTime = null;
     // create lookup key for start time
     byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
-    // retrieve value for key
-    byte[] v = db.get(b);
-    if (v == null) {
-      // start time doesn't exist in db
-      if (suggestedStartTime == null) {
-        return null;
+    try {
+      // retrieve value for key
+      byte[] v = db.get(b);
+      if (v == null) {
+        // start time doesn't exist in db
+        if (suggestedStartTime == null) {
+          return null;
+        }
+        startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
+            System.currentTimeMillis());
+        
+        // write suggested start time
+        v = new byte[16];
+        writeReverseOrderedLong(suggestedStartTime, v, 0);
+        writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
+        WriteOptions writeOptions = new WriteOptions();
+        writeOptions.sync(true);
+        db.put(b, v, writeOptions);
+      } else {
+        // found start time in db, so ignore suggested start time
+        startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
+            readReverseOrderedLong(v, 8));
       }
-      startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
-          System.currentTimeMillis());
-
-      // write suggested start time
-      v = new byte[16];
-      writeReverseOrderedLong(suggestedStartTime, v, 0);
-      writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
-      WriteOptions writeOptions = new WriteOptions();
-      writeOptions.sync(true);
-      db.put(b, v, writeOptions);
-    } else {
-      // found start time in db, so ignore suggested start time
-      startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
-          readReverseOrderedLong(v, 8));
-    }
+    } catch(DBException e) {
+      throw new IOException(e);            	
+    } 
     startTimeWriteCache.put(entity, startAndInsertTime);
     startTimeReadCache.put(entity, startAndInsertTime.startTime);
     return startAndInsertTime;
@@ -1373,7 +1387,7 @@ public class LeveldbTimelineStore extends AbstractService
 
   @VisibleForTesting
   List<String> getEntityTypes() throws IOException {
-    DBIterator iterator = null;
+    LeveldbIterator iterator = null;
     try {
       iterator = getDbIterator(false);
       List<String> entityTypes = new ArrayList<String>();
@@ -1396,6 +1410,8 @@ public class LeveldbTimelineStore extends AbstractService
         iterator.seek(lookupKey);
       }
       return entityTypes;
+    } catch(DBException e) {
+      throw new IOException(e);            	
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
@@ -1406,7 +1422,7 @@ public class LeveldbTimelineStore extends AbstractService
    * the given write batch.
    */
   private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix,
-      DBIterator iterator) {
+      LeveldbIterator iterator) {
     for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) {
       byte[] key = iterator.peekNext().getKey();
       if (!prefixMatches(prefix, prefix.length, key)) {
@@ -1418,7 +1434,7 @@ public class LeveldbTimelineStore extends AbstractService
 
   @VisibleForTesting
   boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
-      DBIterator iterator, DBIterator pfIterator, boolean seeked)
+      LeveldbIterator iterator, LeveldbIterator pfIterator, boolean seeked)
       throws IOException {
     WriteBatch writeBatch = null;
     try {
@@ -1524,6 +1540,8 @@ public class LeveldbTimelineStore extends AbstractService
       writeOptions.sync(true);
       db.write(writeBatch, writeOptions);
       return true;
+    } catch(DBException e) {
+      throw new IOException(e);
     } finally {
       IOUtils.cleanup(LOG, writeBatch);
     }
@@ -1542,8 +1560,8 @@ public class LeveldbTimelineStore extends AbstractService
     try {
       List<String> entityTypes = getEntityTypes();
       for (String entityType : entityTypes) {
-        DBIterator iterator = null;
-        DBIterator pfIterator = null;
+        LeveldbIterator iterator = null;
+        LeveldbIterator pfIterator = null;
         long typeCount = 0;
         try {
           deleteLock.writeLock().lock();
@@ -1583,21 +1601,25 @@ public class LeveldbTimelineStore extends AbstractService
   }
 
   @VisibleForTesting
-  DBIterator getDbIterator(boolean fillCache) {
+  LeveldbIterator getDbIterator(boolean fillCache) {
     ReadOptions readOptions = new ReadOptions();
     readOptions.fillCache(fillCache);
-    return db.iterator(readOptions);
+    return new LeveldbIterator(db, readOptions);
   }
   
   Version loadVersion() throws IOException {
-    byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
-    // if version is not stored previously, treat it as 1.0.
-    if (data == null || data.length == 0) {
-      return Version.newInstance(1, 0);
+    try {
+      byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
+      // if version is not stored previously, treat it as 1.0.
+      if (data == null || data.length == 0) {
+        return Version.newInstance(1, 0);
+      }
+      Version version =
+          new VersionPBImpl(VersionProto.parseFrom(data));
+      return version;
+    } catch(DBException e) {
+      throw new IOException(e);    	
     }
-    Version version =
-        new VersionPBImpl(VersionProto.parseFrom(data));
-    return version;
   }
   
   // Only used for test
@@ -1726,6 +1748,8 @@ public class LeveldbTimelineStore extends AbstractService
       writeBatch.put(domainEntryKey, timestamps);
       writeBatch.put(ownerLookupEntryKey, timestamps);
       db.write(writeBatch);
+    } catch(DBException e) {
+      throw new IOException(e);            	
     } finally {
       IOUtils.cleanup(LOG, writeBatch);
     }
@@ -1754,13 +1778,15 @@ public class LeveldbTimelineStore extends AbstractService
   @Override
   public TimelineDomain getDomain(String domainId)
       throws IOException {
-    DBIterator iterator = null;
+    LeveldbIterator iterator = null;
     try {
       byte[] prefix = KeyBuilder.newInstance()
           .add(DOMAIN_ENTRY_PREFIX).add(domainId).getBytesForLookup();
-      iterator = db.iterator();
+      iterator = new LeveldbIterator(db);
       iterator.seek(prefix);
       return getTimelineDomain(iterator, domainId, prefix);
+    } catch(DBException e) {
+      throw new IOException(e);            	
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
@@ -1769,12 +1795,12 @@ public class LeveldbTimelineStore extends AbstractService
   @Override
   public TimelineDomains getDomains(String owner)
       throws IOException {
-    DBIterator iterator = null;
+    LeveldbIterator iterator = null;
     try {
       byte[] prefix = KeyBuilder.newInstance()
           .add(OWNER_LOOKUP_PREFIX).add(owner).getBytesForLookup();
       List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
-      for (iterator = db.iterator(), iterator.seek(prefix);
+      for (iterator = new LeveldbIterator(db), iterator.seek(prefix);
           iterator.hasNext();) {
         byte[] key = iterator.peekNext().getKey();
         if (!prefixMatches(prefix, prefix.length, key)) {
@@ -1809,13 +1835,15 @@ public class LeveldbTimelineStore extends AbstractService
       TimelineDomains domainsToReturn = new TimelineDomains();
       domainsToReturn.addDomains(domains);
       return domainsToReturn;
+    } catch(DBException e) {
+      throw new IOException(e);            	
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
   }
 
   private static TimelineDomain getTimelineDomain(
-      DBIterator iterator, String domainId, byte[] prefix) throws IOException {
+      LeveldbIterator iterator, String domainId, byte[] prefix) throws IOException {
     // Iterate over all the rows whose key starts with prefix to retrieve the
     // domain information.
     TimelineDomain domain = new TimelineDomain();
@@ -1852,5 +1880,5 @@ public class LeveldbTimelineStore extends AbstractService
     } else {
       return domain;
     }
-  }
+  }    
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89ef49fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
index 5ebc96b..d266aa2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
@@ -45,7 +45,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
-import org.iq80.leveldb.DBIterator;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
+import org.iq80.leveldb.DBException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -146,13 +147,15 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
 
   private boolean deleteNextEntity(String entityType, byte[] ts)
       throws IOException, InterruptedException {
-    DBIterator iterator = null;
-    DBIterator pfIterator = null;
+    LeveldbIterator iterator = null;
+    LeveldbIterator pfIterator = null;
     try {
       iterator = ((LeveldbTimelineStore)store).getDbIterator(false);
       pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false);
       return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
           iterator, pfIterator, false);
+    } catch(DBException e) {
+      throw new IOException(e);   	
     } finally {
       IOUtils.cleanup(null, iterator, pfIterator);
     }