You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by lj...@apache.org on 2019/03/11 09:25:33 UTC

[hadoop] branch trunk updated: HDDS-1220. KeyManager#openKey should release the bucket lock before doing an allocateBlock. Contributed by Lokesh Jain.

This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ebb5fa1  HDDS-1220. KeyManager#openKey should release the bucket lock before doing an allocateBlock. Contributed by Lokesh Jain.
ebb5fa1 is described below

commit ebb5fa115b842a3bcb747e881ca2a8fe4c716d07
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Mon Mar 11 14:54:44 2019 +0530

    HDDS-1220. KeyManager#openKey should release the bucket lock before doing an allocateBlock. Contributed by Lokesh Jain.
---
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 126 +++++------
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java | 230 ++++++++++-----------
 2 files changed, 161 insertions(+), 195 deletions(-)

diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 5caeaea..6500a5e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -240,37 +240,49 @@ public class KeyManagerImpl implements KeyManager {
           OMException.ResultCodes.KEY_NOT_FOUND);
     }
 
-    AllocatedBlock allocatedBlock;
-    try {
-      allocatedBlock =
-          scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
-              keyInfo.getFactor(), omId, excludeList);
-    } catch (SCMException ex) {
-      if (ex.getResult()
-          .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
-        throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE);
-      }
-      throw ex;
-    }
-    OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
-        .setBlockID(new BlockID(allocatedBlock.getBlockID()))
-        .setLength(scmBlockSize)
-        .setOffset(0);
-    if (grpcBlockTokenEnabled) {
-      String remoteUser = getRemoteUser().getShortUserName();
-      builder.setToken(secretManager.generateToken(remoteUser,
-          allocatedBlock.getBlockID().toString(),
-          getAclForUser(remoteUser),
-          scmBlockSize));
-    }
-    OmKeyLocationInfo info = builder.build();
     // current version not committed, so new blocks coming now are added to
     // the same version
-    keyInfo.appendNewBlocks(Collections.singletonList(info));
+    List<OmKeyLocationInfo> locationInfos =
+        allocateBlock(keyInfo, excludeList, scmBlockSize);
+    keyInfo.appendNewBlocks(locationInfos);
     keyInfo.updateModifcationTime();
     metadataManager.getOpenKeyTable().put(openKey,
         keyInfo);
-    return info;
+    return locationInfos.get(0);
+  }
+
+  private List<OmKeyLocationInfo> allocateBlock(OmKeyInfo keyInfo,
+      ExcludeList excludeList, long requestedSize) throws IOException {
+    int numBlocks = (int) ((requestedSize - 1) / scmBlockSize + 1);
+    List<OmKeyLocationInfo> locationInfos = new ArrayList<>(numBlocks);
+    while (requestedSize > 0) {
+      long allocateSize = Math.min(requestedSize, scmBlockSize);
+      AllocatedBlock allocatedBlock;
+      try {
+        allocatedBlock = scmBlockClient
+            .allocateBlock(allocateSize, keyInfo.getType(), keyInfo.getFactor(),
+                omId, excludeList);
+      } catch (SCMException ex) {
+        if (ex.getResult()
+            .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
+          throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE);
+        }
+        throw ex;
+      }
+      OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
+          .setBlockID(new BlockID(allocatedBlock.getBlockID()))
+          .setLength(scmBlockSize)
+          .setOffset(0);
+      if (grpcBlockTokenEnabled) {
+        String remoteUser = getRemoteUser().getShortUserName();
+        builder.setToken(secretManager
+            .generateToken(remoteUser, allocatedBlock.getBlockID().toString(),
+                getAclForUser(remoteUser), scmBlockSize));
+      }
+      locationInfos.add(builder.build());
+      requestedSize -= allocateSize;
+    }
+    return locationInfos;
   }
 
   /* Optimize ugi lookup for RPC operations to avoid a trip through
@@ -327,6 +339,10 @@ public class KeyManagerImpl implements KeyManager {
     ReplicationFactor factor = args.getFactor();
     ReplicationType type = args.getType();
     long currentTime = Time.monotonicNowNanos();
+    long requestedSize = Math.min(preallocateMax, args.getDataSize());
+    OmKeyInfo keyInfo;
+    String openKey;
+    long openVersion;
 
     FileEncryptionInfo encInfo = null;
     OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
@@ -378,55 +394,14 @@ public class KeyManagerImpl implements KeyManager {
           type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
         }
       }
-      long requestedSize = Math.min(preallocateMax, args.getDataSize());
       List<OmKeyLocationInfo> locations = new ArrayList<>();
       String objectKey = metadataManager.getOzoneKey(
           volumeName, bucketName, keyName);
-      // requested size is not required but more like a optimization:
-      // SCM looks at the requested, if it 0, no block will be allocated at
-      // the point, if client needs more blocks, client can always call
-      // allocateBlock. But if requested size is not 0, OM will preallocate
-      // some blocks and piggyback to client, to save RPC calls.
-      while (requestedSize > 0) {
-        long allocateSize = Math.min(scmBlockSize, requestedSize);
-        AllocatedBlock allocatedBlock;
-        try {
-          allocatedBlock = scmBlockClient
-              .allocateBlock(allocateSize, type, factor, omId,
-                  new ExcludeList());
-        } catch (IOException ex) {
-          if (ex instanceof SCMException) {
-            if (((SCMException) ex).getResult()
-                .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
-              throw new OMException(ex.getMessage(),
-                  ResultCodes.SCM_IN_CHILL_MODE);
-            }
-          }
-          throw ex;
-        }
-        OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
-            .setBlockID(new BlockID(allocatedBlock.getBlockID()))
-            .setLength(allocateSize)
-            .setOffset(0);
-        if (grpcBlockTokenEnabled) {
-          String remoteUser = getRemoteUser().getShortUserName();
-          builder.setToken(secretManager.generateToken(remoteUser,
-              allocatedBlock.getBlockID().toString(),
-              getAclForUser(remoteUser),
-              scmBlockSize));
-        }
-
-        OmKeyLocationInfo subKeyInfo = builder.build();
-        locations.add(subKeyInfo);
-        requestedSize -= allocateSize;
-      }
       // NOTE size of a key is not a hard limit on anything, it is a value that
       // client should expect, in terms of current size of key. If client sets a
       // value, then this value is used, otherwise, we allocate a single block
       // which is the current size, if read by the client.
       long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
-      OmKeyInfo keyInfo;
-      long openVersion;
 
       if (args.getIsMultipartKey()) {
         // For this upload part we don't need to check in KeyTable. As this
@@ -449,7 +424,7 @@ public class KeyManagerImpl implements KeyManager {
           openVersion = 0;
         }
       }
-      String openKey = metadataManager.getOpenKey(
+      openKey = metadataManager.getOpenKey(
           volumeName, bucketName, keyName, currentTime);
       if (metadataManager.getOpenKeyTable().get(openKey) != null) {
         // This should not happen. If this condition is satisfied, it means
@@ -464,10 +439,8 @@ public class KeyManagerImpl implements KeyManager {
         throw new OMException("Cannot allocate key. Not able to get a valid" +
             "open key id.", ResultCodes.KEY_ALLOCATION_ERROR);
       }
-      metadataManager.getOpenKeyTable().put(openKey, keyInfo);
       LOG.debug("Key {} allocated in volume {} bucket {}",
           keyName, volumeName, bucketName);
-      return new OpenKeySession(currentTime, keyInfo, openVersion);
     } catch (OMException e) {
       throw e;
     } catch (IOException ex) {
@@ -478,6 +451,19 @@ public class KeyManagerImpl implements KeyManager {
     } finally {
       metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
+
+    // requested size is not required but more like a optimization:
+    // SCM looks at the requested, if it 0, no block will be allocated at
+    // the point, if client needs more blocks, client can always call
+    // allocateBlock. But if requested size is not 0, OM will preallocate
+    // some blocks and piggyback to client, to save RPC calls.
+    if (requestedSize > 0) {
+      List<OmKeyLocationInfo> locationInfos =
+          allocateBlock(keyInfo, new ExcludeList(), requestedSize);
+      keyInfo.appendNewBlocks(locationInfos);
+    }
+    metadataManager.getOpenKeyTable().put(openKey, keyInfo);
+    return new OpenKeySession(currentTime, keyInfo, openVersion);
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 992ccaf..a76d052 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -18,177 +18,141 @@
 
 package org.apache.hadoop.ozone.om;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.om.codec.OmBucketInfoCodec;
-import org.apache.hadoop.ozone.om.codec.OmKeyInfoCodec;
-import org.apache.hadoop.ozone.om.codec.OmVolumeArgsCodec;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import org.apache.hadoop.ozone.om.helpers.*;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.db.CodecRegistry;
-import org.apache.hadoop.utils.db.RDBStore;
-import org.apache.hadoop.utils.db.Table;
-import org.apache.hadoop.utils.db.TableConfig;
 
-import org.junit.Before;
-import org.junit.Rule;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-import org.rocksdb.RocksDB;
-import org.rocksdb.Statistics;
-import org.rocksdb.StatsLevel;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
 
 /**
  * Test class for @{@link KeyManagerImpl}.
- * */
+ */
 public class TestKeyManagerImpl {
 
   private static KeyManagerImpl keyManager;
-  private static ScmBlockLocationProtocol scmBlockLocationProtocol;
+  private static VolumeManagerImpl volumeManager;
+  private static BucketManagerImpl bucketManager;
+  private static StorageContainerManager scm;
+  private static ScmBlockLocationProtocol mockScmBlockLocationProtocol;
   private static OzoneConfiguration conf;
   private static OMMetadataManager metadataManager;
-  private static long blockSize = 1000;
+  private static File dir;
+  private static long scmBlockSize;
   private static final String KEY_NAME = "key1";
   private static final String BUCKET_NAME = "bucket1";
   private static final String VOLUME_NAME = "vol1";
-  private static RDBStore rdbStore = null;
-  private static Table<String, OmKeyInfo> keyTable = null;
-  private static Table<String, OmBucketInfo> bucketTable = null;
-  private static Table<String, OmVolumeArgs> volumeTable = null;
-  private static DBOptions options = null;
-  private KeyInfo keyData;
-  @Rule
-  public TemporaryFolder folder = new TemporaryFolder();
 
-  @Before
-  public void setUp() throws Exception {
+  @BeforeClass
+  public static void setUp() throws Exception {
     conf = new OzoneConfiguration();
-    scmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class);
-    metadataManager = Mockito.mock(OMMetadataManager.class);
-    keyManager = new KeyManagerImpl(scmBlockLocationProtocol, metadataManager,
-        conf, "om1", null);
-    setupMocks();
-  }
-
-  private void setupMocks() throws Exception {
-    Mockito.when(scmBlockLocationProtocol
+    dir = GenericTestUtils.getRandomizedTestDir();
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
+    mockScmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class);
+    metadataManager = new OmMetadataManagerImpl(conf);
+    volumeManager = new VolumeManagerImpl(metadataManager, conf);
+    bucketManager = new BucketManagerImpl(metadataManager);
+    NodeManager nodeManager = new MockNodeManager(true, 10);
+    SCMConfigurator configurator = new SCMConfigurator();
+    configurator.setScmNodeManager(nodeManager);
+    scm = TestUtils.getScm(conf, configurator);
+    scm.start();
+    scm.exitChillMode();
+    scmBlockSize = (long) conf
+        .getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT,
+            StorageUnit.BYTES);
+    conf.setLong(OZONE_KEY_PREALLOCATION_MAXSIZE, scmBlockSize * 10);
+
+    keyManager =
+        new KeyManagerImpl(scm.getBlockProtocolServer(), metadataManager, conf,
+            "om1", null);
+    Mockito.when(mockScmBlockLocationProtocol
         .allocateBlock(Mockito.anyLong(), Mockito.any(ReplicationType.class),
             Mockito.any(ReplicationFactor.class), Mockito.anyString(),
-            Mockito.any(ExcludeList.class)))
-        .thenThrow(
-            new SCMException("ChillModePrecheck failed for allocateBlock",
-                ResultCodes.CHILL_MODE_EXCEPTION));
-    setupRocksDb();
-    Mockito.when(metadataManager.getVolumeTable()).thenReturn(volumeTable);
-    Mockito.when(metadataManager.getBucketTable()).thenReturn(bucketTable);
-    Mockito.when(metadataManager.getOpenKeyTable()).thenReturn(keyTable);
-    Mockito.when(metadataManager.getLock())
-        .thenReturn(new OzoneManagerLock(conf));
-    Mockito.when(metadataManager.getVolumeKey(VOLUME_NAME))
-        .thenReturn(VOLUME_NAME);
-    Mockito.when(metadataManager.getBucketKey(VOLUME_NAME, BUCKET_NAME))
-        .thenReturn(BUCKET_NAME);
-    Mockito.when(metadataManager.getOpenKey(VOLUME_NAME, BUCKET_NAME,
-        KEY_NAME, 1)).thenReturn(KEY_NAME);
+            Mockito.any(ExcludeList.class))).thenThrow(
+        new SCMException("ChillModePrecheck failed for allocateBlock",
+            ResultCodes.CHILL_MODE_EXCEPTION));
+    createVolume(VOLUME_NAME);
+    createBucket(VOLUME_NAME, BUCKET_NAME);
   }
 
-  private void setupRocksDb() throws Exception {
-    options = new DBOptions();
-    options.setCreateIfMissing(true);
-    options.setCreateMissingColumnFamilies(true);
-
-    Statistics statistics = new Statistics();
-    statistics.setStatsLevel(StatsLevel.ALL);
-    options = options.setStatistics(statistics);
+  @AfterClass
+  public static void cleanup() throws Exception {
+    scm.stop();
+    scm.join();
+    metadataManager.stop();
+    keyManager.stop();
+    FileUtils.deleteDirectory(dir);
+  }
 
-    Set<TableConfig> configSet = new HashSet<>();
-    for (String name : Arrays
-        .asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
-            "testKeyTable", "testBucketTable", "testVolumeTable")) {
-      TableConfig newConfig = new TableConfig(name, new ColumnFamilyOptions());
-      configSet.add(newConfig);
-    }
-    keyData = KeyInfo.newBuilder()
-        .setKeyName(KEY_NAME)
-        .setBucketName(BUCKET_NAME)
-        .setVolumeName(VOLUME_NAME)
-        .setDataSize(blockSize)
-        .setType(ReplicationType.STAND_ALONE)
-        .setFactor(ReplicationFactor.ONE)
-        .setCreationTime(Time.now())
-        .setModificationTime(Time.now())
+  private static void createBucket(String volumeName, String bucketName)
+      throws IOException {
+    OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
         .build();
+    bucketManager.createBucket(bucketInfo);
+  }
 
-    CodecRegistry registry = new CodecRegistry();
-    registry.addCodec(OmKeyInfo.class, new OmKeyInfoCodec());
-    registry.addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec());
-    registry.addCodec(OmBucketInfo.class, new OmBucketInfoCodec());
-    rdbStore = new RDBStore(folder.newFolder(), options, configSet, registry);
-
-    keyTable =
-        rdbStore.getTable("testKeyTable", String.class, OmKeyInfo.class);
-
-    bucketTable =
-        rdbStore.getTable("testBucketTable", String.class, OmBucketInfo.class);
-
-    volumeTable =
-        rdbStore.getTable("testVolumeTable", String.class, OmVolumeArgs.class);
-
-    volumeTable.put(VOLUME_NAME, OmVolumeArgs.newBuilder()
-        .setAdminName("a")
-        .setOwnerName("o")
-        .setVolume(VOLUME_NAME)
-        .build());
-
-    bucketTable.put(BUCKET_NAME,
-        new OmBucketInfo.Builder().setBucketName(BUCKET_NAME)
-            .setVolumeName(VOLUME_NAME).build());
-
-    keyTable.put(KEY_NAME, new OmKeyInfo.Builder()
-        .setVolumeName(VOLUME_NAME)
-        .setBucketName(BUCKET_NAME)
-        .setKeyName(KEY_NAME)
-        .setReplicationType(ReplicationType.STAND_ALONE)
-        .setReplicationFactor(ReplicationFactor.THREE)
-        .build());
-
+  private static void createVolume(String volumeName) throws IOException {
+    OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
+        .setVolume(volumeName)
+        .setAdminName("bilbo")
+        .setOwnerName("bilbo")
+        .build();
+    volumeManager.createVolume(volumeArgs);
   }
 
   @Test
   public void allocateBlockFailureInChillMode() throws Exception {
-    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setKeyName(KEY_NAME)
+    KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol,
+        metadataManager, conf, "om1", null);
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setKeyName(KEY_NAME)
         .setBucketName(BUCKET_NAME)
         .setFactor(ReplicationFactor.ONE)
         .setType(ReplicationType.STAND_ALONE)
-        .setVolumeName(VOLUME_NAME).build();
+        .setVolumeName(VOLUME_NAME)
+        .build();
+    OpenKeySession keySession = keyManager1.openKey(keyArgs);
     LambdaTestUtils.intercept(OMException.class,
         "ChillModePrecheck failed for allocateBlock", () -> {
-          keyManager.allocateBlock(keyArgs, 1, new ExcludeList());
+          keyManager1
+              .allocateBlock(keyArgs, keySession.getId(), new ExcludeList());
         });
   }
 
   @Test
   public void openKeyFailureInChillMode() throws Exception {
-    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setKeyName(KEY_NAME)
+    KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol,
+        metadataManager, conf, "om1", null);
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setKeyName(KEY_NAME)
         .setBucketName(BUCKET_NAME)
         .setFactor(ReplicationFactor.ONE)
         .setDataSize(1000)
@@ -196,7 +160,23 @@ public class TestKeyManagerImpl {
         .setVolumeName(VOLUME_NAME).build();
     LambdaTestUtils.intercept(OMException.class,
         "ChillModePrecheck failed for allocateBlock", () -> {
-          keyManager.openKey(keyArgs);
+          keyManager1.openKey(keyArgs);
         });
   }
+
+  @Test
+  public void openKeyWithMultipleBlocks() throws IOException {
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setKeyName(UUID.randomUUID().toString())
+        .setBucketName(BUCKET_NAME)
+        .setFactor(ReplicationFactor.ONE)
+        .setDataSize(scmBlockSize * 10)
+        .setType(ReplicationType.STAND_ALONE)
+        .setVolumeName(VOLUME_NAME)
+        .build();
+    OpenKeySession keySession = keyManager.openKey(keyArgs);
+    OmKeyInfo keyInfo = keySession.getKeyInfo();
+    Assert.assertEquals(10,
+        keyInfo.getLatestVersionLocations().getLocationList().size());
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org