You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/05/23 17:12:49 UTC
[01/18] hadoop git commit: HDDS-89. Addendum Patch. Create ozone
specific inline documentation as part of the build Contributed by Elek,
Marton.
Repository: hadoop
Updated Branches:
refs/heads/HDDS-48 60821fb20 -> 699a6918a
HDDS-89. Addendum Patch. Create ozone specific inline documentation as part of the build
Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/523f602f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/523f602f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/523f602f
Branch: refs/heads/HDDS-48
Commit: 523f602f81eafd56e4adfadd70d7c9a672b5813a
Parents: 60821fb
Author: Anu Engineer <ae...@apache.org>
Authored: Tue May 22 13:20:42 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue May 22 13:20:42 2018 -0700
----------------------------------------------------------------------
hadoop-dist/pom.xml | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/523f602f/hadoop-dist/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-dist/pom.xml b/hadoop-dist/pom.xml
index 41e040f..dfbf818 100644
--- a/hadoop-dist/pom.xml
+++ b/hadoop-dist/pom.xml
@@ -68,13 +68,6 @@
<artifactId>hadoop-client-integration-tests</artifactId>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-ozone-docs</artifactId>
- <scope>provided</scope>
- </dependency>
-
-
</dependencies>
@@ -267,6 +260,11 @@
<artifactId>hadoop-ozone-tools</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-ozone-docs</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[12/18] hadoop git commit: HDFS-13540. DFSStripedInputStream should
only allocate new buffers when reading. Contributed by Xiao Chen.
Posted by ha...@apache.org.
HDFS-13540. DFSStripedInputStream should only allocate new buffers when reading. Contributed by Xiao Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/34e8b9f9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/34e8b9f9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/34e8b9f9
Branch: refs/heads/HDDS-48
Commit: 34e8b9f9a86fb03156861482643fba11bdee1dd4
Parents: fed2bef
Author: Sammi Chen <sa...@intel.com>
Authored: Wed May 23 19:10:09 2018 +0800
Committer: Sammi Chen <sa...@intel.com>
Committed: Wed May 23 19:10:09 2018 +0800
----------------------------------------------------------------------
.../apache/hadoop/io/ElasticByteBufferPool.java | 12 ++++++
.../hadoop/hdfs/DFSStripedInputStream.java | 12 +++---
.../hadoop/hdfs/TestDFSStripedInputStream.java | 45 ++++++++++++++++++++
3 files changed, 64 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34e8b9f9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
index 023f37f..9dd7771 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
@@ -116,4 +116,16 @@ public final class ElasticByteBufferPool implements ByteBufferPool {
// poor granularity.
}
}
+
+ /**
+ * Get the size of the buffer pool, for the specified buffer type.
+ *
+ * @param direct Whether the size is returned for direct buffers
+ * @return The size
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public int size(boolean direct) {
+ return getBufferTree(direct).size();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34e8b9f9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index f3b16e0..5557a50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -116,12 +116,14 @@ public class DFSStripedInputStream extends DFSInputStream {
return decoder.preferDirectBuffer();
}
- void resetCurStripeBuffer() {
- if (curStripeBuf == null) {
+ private void resetCurStripeBuffer(boolean shouldAllocateBuf) {
+ if (shouldAllocateBuf && curStripeBuf == null) {
curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
cellSize * dataBlkNum);
}
- curStripeBuf.clear();
+ if (curStripeBuf != null) {
+ curStripeBuf.clear();
+ }
curStripeRange = new StripeRange(0, 0);
}
@@ -206,7 +208,7 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
@Override
protected void closeCurrentBlockReaders() {
- resetCurStripeBuffer();
+ resetCurStripeBuffer(false);
if (blockReaders == null || blockReaders.length == 0) {
return;
}
@@ -296,7 +298,7 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
private void readOneStripe(CorruptedBlocks corruptedBlocks)
throws IOException {
- resetCurStripeBuffer();
+ resetCurStripeBuffer(true);
// compute stripe range based on pos
final long offsetInBlockGroup = getOffsetInBlockGroup();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34e8b9f9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index cdebee0..422746e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
@@ -529,4 +530,48 @@ public class TestDFSStripedInputStream {
}
}
}
+
+ @Test
+ public void testCloseDoesNotAllocateNewBuffer() throws Exception {
+ final int numBlocks = 2;
+ DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+ stripesPerBlock, false, ecPolicy);
+ try (DFSInputStream in = fs.getClient().open(filePath.toString())) {
+ assertTrue(in instanceof DFSStripedInputStream);
+ final DFSStripedInputStream stream = (DFSStripedInputStream) in;
+ final ElasticByteBufferPool ebbp =
+ (ElasticByteBufferPool) stream.getBufferPool();
+ // first clear existing pool
+ LOG.info("Current pool size: direct: " + ebbp.size(true) + ", indirect: "
+ + ebbp.size(false));
+ emptyBufferPoolForCurrentPolicy(ebbp, true);
+ emptyBufferPoolForCurrentPolicy(ebbp, false);
+ final int startSizeDirect = ebbp.size(true);
+ final int startSizeIndirect = ebbp.size(false);
+ // close should not allocate new buffers in the pool.
+ stream.close();
+ assertEquals(startSizeDirect, ebbp.size(true));
+ assertEquals(startSizeIndirect, ebbp.size(false));
+ }
+ }
+
+ /**
+ * Empties the pool for the specified buffer type, for the current ecPolicy.
+ * <p>
+ * Note that {@link #ecPolicy} may change for difference test cases in
+ * {@link TestDFSStripedInputStreamWithRandomECPolicy}.
+ */
+ private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp,
+ boolean direct) {
+ int size;
+ while ((size = ebbp.size(direct)) != 0) {
+ ebbp.getBuffer(direct,
+ ecPolicy.getCellSize() * ecPolicy.getNumDataUnits());
+ if (size == ebbp.size(direct)) {
+ // if getBuffer didn't decrease size, it means the pool for the buffer
+ // corresponding to current ecPolicy is empty
+ break;
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[18/18] hadoop git commit: HDDS-110. Checkstyle is not working in the
HDDS precommit hook. Contributed by Elek, Marton.
Posted by ha...@apache.org.
HDDS-110. Checkstyle is not working in the HDDS precommit hook.
Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/699a6918
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/699a6918
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/699a6918
Branch: refs/heads/HDDS-48
Commit: 699a6918aca2b57ae9ad0bff2c3aaf5a776da614
Parents: c0c9b7a
Author: Anu Engineer <ae...@apache.org>
Authored: Wed May 23 09:42:21 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Wed May 23 10:01:53 2018 -0700
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/699a6918/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0e7b23a..13f9255 100644
--- a/pom.xml
+++ b/pom.xml
@@ -322,7 +322,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-build-tools</artifactId>
- <version>${project.version}</version>
+ <version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[02/18] hadoop git commit: YARN-8332. Incorrect min/max allocation
property name in resource types doc. (Weiwei Yang via wangda)
Posted by ha...@apache.org.
YARN-8332. Incorrect min/max allocation property name in resource types doc. (Weiwei Yang via wangda)
Change-Id: If74f1ceed9c045a2cb2d6593741278b65ac44a9f
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/83f53e5c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/83f53e5c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/83f53e5c
Branch: refs/heads/HDDS-48
Commit: 83f53e5c6236de30c213dc41878cebfb02597e26
Parents: bd15d23
Author: Wangda Tan <wa...@apache.org>
Authored: Tue May 22 13:29:21 2018 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Tue May 22 13:33:33 2018 -0700
----------------------------------------------------------------------
.../hadoop-yarn-site/src/site/markdown/ResourceModel.md | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83f53e5c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceModel.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceModel.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceModel.md
index f968b5f..ac16d53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceModel.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceModel.md
@@ -49,8 +49,8 @@ The following configuration properties are supported. See below for details.
|:---- |:---- |
| `yarn.resource-types` | Comma-separated list of additional resources. May not include `memory`, `memory-mb`, or `vcores` |
| `yarn.resource-types.<resource>.units` | Default unit for the specified resource type |
-| `yarn.resource-types.<resource>.minimum` | The minimum request for the specified resource type |
-| `yarn.resource-types.<resource>.maximum` | The maximum request for the specified resource type |
+| `yarn.resource-types.<resource>.minimum-allocation` | The minimum request for the specified resource type |
+| `yarn.resource-types.<resource>.maximum-allocation` | The maximum request for the specified resource type |
`node-resources.xml`
@@ -127,8 +127,8 @@ set the default unit for the resource type. Valid values are:
The property must be named `yarn.resource-types.<resource>.units`. Each defined
resource may also have optional minimum and maximum properties. The properties
-must be named `yarn.resource-types.<resource>.minimum` and
-`yarn.resource-types.<resource>.maximum`.
+must be named `yarn.resource-types.<resource>.minimum-allocation` and
+`yarn.resource-types.<resource>.maximum-allocation`.
The `yarn.resource-types` property and any unit, mimimum, or maximum properties
may be defined in either the usual `yarn-site.xml` file or in a file named
@@ -147,12 +147,12 @@ may be defined in either the usual `yarn-site.xml` file or in a file named
</property>
<property>
- <name>yarn.resource-types.resource2.minimum</name>
+ <name>yarn.resource-types.resource2.minimum-allocation</name>
<value>1</value>
</property>
<property>
- <name>yarn.resource-types.resource2.maximum</name>
+ <name>yarn.resource-types.resource2.maximum-allocation</name>
<value>1024</value>
</property>
</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[13/18] hadoop git commit: YARN-8285. Remove unused environment
variables from the Docker runtime. Contributed by Eric Badger
Posted by ha...@apache.org.
YARN-8285. Remove unused environment variables from the Docker runtime. Contributed by Eric Badger
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9837ca9c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9837ca9c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9837ca9c
Branch: refs/heads/HDDS-48
Commit: 9837ca9cc746573571029f9fb996a1be10b588ab
Parents: 34e8b9f
Author: Shane Kumpf <sk...@apache.org>
Authored: Wed May 23 06:43:44 2018 -0600
Committer: Shane Kumpf <sk...@apache.org>
Committed: Wed May 23 06:43:44 2018 -0600
----------------------------------------------------------------------
.../linux/runtime/DockerLinuxContainerRuntime.java | 9 ---------
1 file changed, 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9837ca9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
index 787e892..e131e9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
@@ -106,9 +106,6 @@ import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.r
* will be used to launch the Docker container.
* </li>
* <li>
- * {@code YARN_CONTAINER_RUNTIME_DOCKER_IMAGE_FILE} is currently ignored.
- * </li>
- * <li>
* {@code YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE} controls
* whether the Docker container's default command is overridden. When set
* to {@code true}, the Docker container's command will be
@@ -198,9 +195,6 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
public static final String ENV_DOCKER_CONTAINER_IMAGE =
"YARN_CONTAINER_RUNTIME_DOCKER_IMAGE";
@InterfaceAudience.Private
- public static final String ENV_DOCKER_CONTAINER_IMAGE_FILE =
- "YARN_CONTAINER_RUNTIME_DOCKER_IMAGE_FILE";
- @InterfaceAudience.Private
public static final String ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE =
"YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE";
@InterfaceAudience.Private
@@ -216,9 +210,6 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
public static final String ENV_DOCKER_CONTAINER_RUN_PRIVILEGED_CONTAINER =
"YARN_CONTAINER_RUNTIME_DOCKER_RUN_PRIVILEGED_CONTAINER";
@InterfaceAudience.Private
- public static final String ENV_DOCKER_CONTAINER_RUN_ENABLE_USER_REMAPPING =
- "YARN_CONTAINER_RUNTIME_DOCKER_RUN_ENABLE_USER_REMAPPING";
- @InterfaceAudience.Private
public static final String ENV_DOCKER_CONTAINER_MOUNTS =
"YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS";
@InterfaceAudience.Private
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[08/18] hadoop git commit: HDDS-49. Standalone protocol should use
grpc in place of netty. Contributed by Mukul Kumar Singh.
Posted by ha...@apache.org.
HDDS-49. Standalone protocol should use grpc in place of netty.
Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5a914069
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5a914069
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5a914069
Branch: refs/heads/HDDS-48
Commit: 5a9140690aba295ba1226a3190b52f34347a8372
Parents: 3e5f7ea
Author: Anu Engineer <ae...@apache.org>
Authored: Tue May 22 16:51:43 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue May 22 19:56:15 2018 -0700
----------------------------------------------------------------------
.../hadoop/hdds/scm/XceiverClientGrpc.java | 217 +++++++++++++++++++
.../hadoop/hdds/scm/XceiverClientManager.java | 21 +-
.../hadoop/hdds/scm/XceiverClientMetrics.java | 8 +-
.../common/dev-support/findbugsExcludeFile.xml | 3 +
hadoop-hdds/common/pom.xml | 17 ++
.../apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 +
.../main/proto/DatanodeContainerProtocol.proto | 7 +
.../common/src/main/resources/ozone-default.xml | 9 +
.../common/helpers/ContainerMetrics.java | 14 +-
.../transport/server/GrpcXceiverService.java | 82 +++++++
.../transport/server/XceiverServerGrpc.java | 105 +++++++++
.../container/ozoneimpl/OzoneContainer.java | 11 +-
.../hadoop/ozone/MiniOzoneClusterImpl.java | 10 +-
.../ozone/scm/TestXceiverClientManager.java | 67 ++++--
hadoop-project/pom.xml | 1 +
15 files changed, 540 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
new file mode 100644
index 0000000..84790e8
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
+import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.shaded.io.grpc.ManagedChannel;
+import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Client for the storageContainer protocol.
+ */
+public class XceiverClientGrpc extends XceiverClientSpi {
+ static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
+ private final Pipeline pipeline;
+ private final Configuration config;
+ private XceiverClientProtocolServiceStub asyncStub;
+ private XceiverClientMetrics metrics;
+ private ManagedChannel channel;
+ private final Semaphore semaphore;
+
+ /**
+ * Constructs a client that can communicate with the Container framework on
+ * data nodes.
+ *
+ * @param pipeline - Pipeline that defines the machines.
+ * @param config -- Ozone Config
+ */
+ public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
+ super();
+ Preconditions.checkNotNull(pipeline);
+ Preconditions.checkNotNull(config);
+ this.pipeline = pipeline;
+ this.config = config;
+ this.semaphore =
+ new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
+ this.metrics = XceiverClientManager.getXceiverClientMetrics();
+ }
+
+ @Override
+ public void connect() throws Exception {
+ DatanodeDetails leader = this.pipeline.getLeader();
+
+ // read port from the data node, on failure use default configured
+ // port.
+ int port = leader.getContainerPort();
+ if (port == 0) {
+ port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+ }
+ LOG.debug("Connecting to server Port : " + leader.getIpAddress());
+ channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
+ .usePlaintext(true)
+ .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+ .build();
+ asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
+ }
+
+ /**
+ * Returns if the xceiver client connects to a server.
+ *
+ * @return True if the connection is alive, false otherwise.
+ */
+ @VisibleForTesting
+ public boolean isConnected() {
+ return !channel.isTerminated() && !channel.isShutdown();
+ }
+
+ @Override
+ public void close() {
+ channel.shutdownNow();
+ try {
+ channel.awaitTermination(60, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception while waiting for channel termination",
+ e);
+ }
+ }
+
+ @Override
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ @Override
+ public ContainerCommandResponseProto sendCommand(
+ ContainerCommandRequestProto request) throws IOException {
+ try {
+ return sendCommandAsync(request).get();
+ } catch (ExecutionException | InterruptedException e) {
+ /**
+ * In case the grpc channel handler throws an exception,
+ * the exception thrown will be wrapped within {@link ExecutionException}.
+ * Unwarpping here so that original exception gets passed
+ * to to the client.
+ */
+ if (e instanceof ExecutionException) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ }
+ }
+ throw new IOException(
+ "Unexpected exception during execution:" + e.getMessage());
+ }
+ }
+
+ /**
+ * Sends a given command to server gets a waitable future back.
+ *
+ * @param request Request
+ * @return Response to the command
+ * @throws IOException
+ */
+ @Override
+ public CompletableFuture<ContainerCommandResponseProto>
+ sendCommandAsync(ContainerCommandRequestProto request)
+ throws IOException, ExecutionException, InterruptedException {
+ final CompletableFuture<ContainerCommandResponseProto> replyFuture =
+ new CompletableFuture<>();
+ semaphore.acquire();
+ long requestTime = Time.monotonicNowNanos();
+ metrics.incrPendingContainerOpsMetrics(request.getCmdType());
+ // create a new grpc stream for each non-async call.
+ final StreamObserver<ContainerCommandRequestProto> requestObserver =
+ asyncStub.send(new StreamObserver<ContainerCommandResponseProto>() {
+ @Override
+ public void onNext(ContainerCommandResponseProto value) {
+ replyFuture.complete(value);
+ metrics.decrPendingContainerOpsMetrics(request.getCmdType());
+ metrics.addContainerOpsLatency(request.getCmdType(),
+ Time.monotonicNowNanos() - requestTime);
+ semaphore.release();
+ }
+ @Override
+ public void onError(Throwable t) {
+ replyFuture.completeExceptionally(t);
+ metrics.decrPendingContainerOpsMetrics(request.getCmdType());
+ metrics.addContainerOpsLatency(request.getCmdType(),
+ Time.monotonicNowNanos() - requestTime);
+ semaphore.release();
+ }
+
+ @Override
+ public void onCompleted() {
+ if (!replyFuture.isDone()) {
+ replyFuture.completeExceptionally(
+ new IOException("Stream completed but no reply for request "
+ + request));
+ }
+ }
+ });
+ requestObserver.onNext(request);
+ requestObserver.onCompleted();
+ return replyFuture;
+ }
+
+ /**
+ * Create a pipeline.
+ *
+ * @param pipelineID - Name of the pipeline.
+ * @param datanodes - Datanodes
+ */
+ @Override
+ public void createPipeline(String pipelineID, List<DatanodeDetails> datanodes)
+ throws IOException {
+ // For stand alone pipeline, there is no notion called setup pipeline.
+ return;
+ }
+
+ /**
+ * Returns pipeline Type.
+ *
+ * @return - Stand Alone as the type.
+ */
+ @Override
+ public HddsProtos.ReplicationType getPipelineType() {
+ return HddsProtos.ReplicationType.STAND_ALONE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index dcaa576..8919797 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -41,8 +41,6 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
- .ReplicationType.RATIS;
/**
* XceiverClientManager is responsible for the lifecycle of XceiverClient
@@ -62,6 +60,7 @@ public class XceiverClientManager implements Closeable {
private final Configuration conf;
private final Cache<Long, XceiverClientSpi> clientCache;
private final boolean useRatis;
+ private final boolean useGrpc;
private static XceiverClientMetrics metrics;
/**
@@ -79,6 +78,8 @@ public class XceiverClientManager implements Closeable {
this.useRatis = conf.getBoolean(
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
+ this.useGrpc = conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+ ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
this.conf = conf;
this.clientCache = CacheBuilder.newBuilder()
.expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
@@ -146,9 +147,19 @@ public class XceiverClientManager implements Closeable {
new Callable<XceiverClientSpi>() {
@Override
public XceiverClientSpi call() throws Exception {
- XceiverClientSpi client = pipeline.getType() == RATIS ?
- XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
- : new XceiverClient(pipeline, conf);
+ XceiverClientSpi client = null;
+ switch (pipeline.getType()) {
+ case RATIS:
+ client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf);
+ break;
+ case STAND_ALONE:
+ client = useGrpc ? new XceiverClientGrpc(pipeline, conf) :
+ new XceiverClient(pipeline, conf);
+ break;
+ case CHAINED:
+ default:
+ throw new IOException ("not implemented" + pipeline.getType());
+ }
client.connect();
return client;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
index fbc348c..a430400 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
@@ -49,13 +49,13 @@ public class XceiverClientMetrics {
this.containerOpsLatency = new MutableRate[numEnumEntries];
for (int i = 0; i < numEnumEntries; i++) {
pendingOpsArray[i] = registry.newCounter(
- "numPending" + ContainerProtos.Type.valueOf(i + 1),
- "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops",
+ "numPending" + ContainerProtos.Type.forNumber(i + 1),
+ "number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops",
(long) 0);
containerOpsLatency[i] = registry.newRate(
- ContainerProtos.Type.valueOf(i + 1) + "Latency",
- "latency of " + ContainerProtos.Type.valueOf(i + 1)
+ ContainerProtos.Type.forNumber(i + 1) + "Latency",
+ "latency of " + ContainerProtos.Type.forNumber(i + 1)
+ " ops");
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml
index 3571a89..daf6fec 100644
--- a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml
@@ -18,4 +18,7 @@
<Match>
<Package name="org.apache.hadoop.hdds.protocol.proto"/>
</Match>
+ <Match>
+ <Package name="org.apache.hadoop.hdds.protocol.datanode.proto"/>
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml
index 6310df1..a8a634c 100644
--- a/hadoop-hdds/common/pom.xml
+++ b/hadoop-hdds/common/pom.xml
@@ -61,6 +61,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>ratis-grpc</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
+ <dependency>
+ <groupId>com.google.errorprone</groupId>
+ <artifactId>error_prone_annotations</artifactId>
+ <version>2.2.0</version>
+ <optional>true</optional>
+ </dependency>
<dependency>
<groupId>org.rocksdb</groupId>
@@ -108,7 +114,15 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
+ <goal>compile-custom</goal>
+ <goal>test-compile-custom</goal>
</goals>
+ <configuration>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
</execution>
</executions>
</plugin>
@@ -122,6 +136,9 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<replace token="com.google.protobuf" value="org.apache.ratis.shaded.com.google.protobuf"
dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto">
</replace>
+ <replace token="io.grpc" value="org.apache.ratis.shaded.io.grpc"
+ dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto">
+ </replace>
</tasks>
</configuration>
<goals>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 29ccf30..85407e6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -49,6 +49,10 @@ public final class ScmConfigKeys {
= "dfs.container.ratis.enabled";
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
= false;
+ public static final String DFS_CONTAINER_GRPC_ENABLED_KEY
+ = "dfs.container.grpc.enabled";
+ public static final boolean DFS_CONTAINER_GRPC_ENABLED_DEFAULT
+ = false;
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY
= "dfs.container.ratis.rpc.type";
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 95b7cbb..1138297 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -24,6 +24,7 @@
// This file contains protocol buffers that are used to transfer data
// to and from the datanode.
+syntax = "proto2";
option java_package = "org.apache.hadoop.hdds.protocol.datanode.proto";
option java_outer_classname = "ContainerProtos";
option java_generate_equals_and_hash = true;
@@ -418,3 +419,9 @@ message CopyContainerResponseProto {
repeated bytes data = 5;
optional int64 checksum = 6;
}
+
+service XceiverClientProtocolService {
+ // A client-to-datanode RPC to send container commands
+ rpc send(stream ContainerCommandRequestProto) returns
+ (stream ContainerCommandResponseProto) {}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index e0aca67..7a91610 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -72,6 +72,15 @@
</description>
</property>
<property>
+ <name>dfs.container.grpc.enabled</name>
+ <value>false</value>
+ <tag>OZONE, MANAGEMENT, PIPELINE, RATIS</tag>
+ <description>Ozone supports different kinds of replication pipelines
+ protocols. grpc is one of the replication pipeline protocol supported by
+ ozone.
+ </description>
+ </property>
+ <property>
<name>dfs.container.ratis.ipc</name>
<value>9858</value>
<tag>OZONE, CONTAINER, PIPELINE, RATIS, MANAGEMENT</tag>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
index 4300b2d..714db59 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
@@ -63,20 +63,20 @@ public class ContainerMetrics {
this.registry = new MetricsRegistry("StorageContainerMetrics");
for (int i = 0; i < numEnumEntries; i++) {
numOpsArray[i] = registry.newCounter(
- "num" + ContainerProtos.Type.valueOf(i + 1),
- "number of " + ContainerProtos.Type.valueOf(i + 1) + " ops",
+ "num" + ContainerProtos.Type.forNumber(i + 1),
+ "number of " + ContainerProtos.Type.forNumber(i + 1) + " ops",
(long) 0);
opsBytesArray[i] = registry.newCounter(
- "bytes" + ContainerProtos.Type.valueOf(i + 1),
- "bytes used by " + ContainerProtos.Type.valueOf(i + 1) + "op",
+ "bytes" + ContainerProtos.Type.forNumber(i + 1),
+ "bytes used by " + ContainerProtos.Type.forNumber(i + 1) + "op",
(long) 0);
opsLatency[i] = registry.newRate(
- "latency" + ContainerProtos.Type.valueOf(i + 1),
- ContainerProtos.Type.valueOf(i + 1) + " op");
+ "latency" + ContainerProtos.Type.forNumber(i + 1),
+ ContainerProtos.Type.forNumber(i + 1) + " op");
for (int j = 0; j < len; j++) {
int interval = intervals[j];
- String quantileName = ContainerProtos.Type.valueOf(i + 1) + "Nanos"
+ String quantileName = ContainerProtos.Type.forNumber(i + 1) + "Nanos"
+ interval + "s";
opsLatQuantiles[i][j] = registry.newQuantiles(quantileName,
"latency of Container ops", "ops", "latency", interval);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
new file mode 100644
index 0000000..df6220c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+ .XceiverClientProtocolServiceGrpc;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Grpc Service for handling Container Commands on datanode.
+ */
+public class GrpcXceiverService extends
+ XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceImplBase {
+ public static final Logger
+ LOG = LoggerFactory.getLogger(GrpcXceiverService.class);
+
+ private final ContainerDispatcher dispatcher;
+
+ public GrpcXceiverService(ContainerDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public StreamObserver<ContainerCommandRequestProto> send(
+ StreamObserver<ContainerCommandResponseProto> responseObserver) {
+ return new StreamObserver<ContainerCommandRequestProto>() {
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ @Override
+ public void onNext(ContainerCommandRequestProto request) {
+ try {
+ ContainerCommandResponseProto resp = dispatcher.dispatch(request);
+ responseObserver.onNext(resp);
+ } catch (Throwable e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} got exception when processing"
+ + " ContainerCommandRequestProto {}: {}", request, e);
+ }
+ responseObserver.onError(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // for now we just log a msg
+ LOG.info("{}: ContainerCommand send on error. Exception: {}", t);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (isClosed.compareAndSet(false, true)) {
+ LOG.info("{}: ContainerCommand send completed");
+ responseObserver.onCompleted();
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
new file mode 100644
index 0000000..30a2f87
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.ratis.shaded.io.grpc.Server;
+import org.apache.ratis.shaded.io.grpc.ServerBuilder;
+import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+
+/**
+ * Creates a Grpc server endpoint that acts as the communication layer for
+ * Ozone containers.
+ */
+public final class XceiverServerGrpc implements XceiverServerSpi {
+ private static final Logger
+ LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
+ private int port;
+ private Server server;
+
+ /**
+ * Constructs a Grpc server class.
+ *
+ * @param conf - Configuration
+ */
+ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
+ ContainerDispatcher dispatcher) {
+ Preconditions.checkNotNull(conf);
+
+ this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+ // Get an available port on current node and
+ // use that as the container port
+ if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
+ try (ServerSocket socket = new ServerSocket()) {
+ socket.setReuseAddress(true);
+ SocketAddress address = new InetSocketAddress(0);
+ socket.bind(address);
+ this.port = socket.getLocalPort();
+ LOG.info("Found a free port for the server : {}", this.port);
+ } catch (IOException e) {
+ LOG.error("Unable find a random free port for the server, "
+ + "fallback to use default port {}", this.port, e);
+ }
+ }
+ datanodeDetails.setContainerPort(port);
+ server = ((NettyServerBuilder) ServerBuilder.forPort(port))
+ .maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+ .addService(new GrpcXceiverService(dispatcher))
+ .build();
+ }
+
+ @Override
+ public int getIPCPort() {
+ return this.port;
+ }
+
+ /**
+ * Returns the Replication type supported by this end-point.
+ *
+ * @return enum -- {Stand_Alone, Ratis, Grpc, Chained}
+ */
+ @Override
+ public HddsProtos.ReplicationType getServerType() {
+ return HddsProtos.ReplicationType.STAND_ALONE;
+ }
+
+ @Override
+ public void start() throws IOException {
+ server.start();
+ }
+
+ @Override
+ public void stop() {
+ server.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 1fc79d7..b497cdc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -40,6 +41,8 @@ import org.apache.hadoop.ozone.container.common.statemachine.background
.BlockDeletingService;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.container.common.transport.server
+ .XceiverServerGrpc;
+import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.XceiverServerRatis;
@@ -121,8 +124,14 @@ public class OzoneContainer {
this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
+ boolean useGrpc = this.ozoneConfig.getBoolean(
+ ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+ ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
server = new XceiverServerSpi[]{
- new XceiverServer(datanodeDetails, this.ozoneConfig, this.dispatcher),
+ useGrpc ? new XceiverServerGrpc(datanodeDetails,
+ this.ozoneConfig, this.dispatcher) :
+ new XceiverServer(datanodeDetails,
+ this.ozoneConfig, this.dispatcher),
XceiverServerRatis
.newXceiverServerRatis(datanodeDetails, this.ozoneConfig, dispatcher)
};
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 08d7176..9936815 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -220,13 +220,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
datanodeService.stop();
datanodeService.join();
// ensure same ports are used across restarts.
- Configuration conf = datanodeService.getConf();
+ Configuration config = datanodeService.getConf();
int currentPort = datanodeService.getDatanodeDetails().getContainerPort();
- conf.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
- conf.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
+ config.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
+ config.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
int ratisPort = datanodeService.getDatanodeDetails().getRatisPort();
- conf.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
- conf.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
+ config.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
+ config.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
datanodeService.start(null);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
index 07ad6ef..77e4e1b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.scm;
import com.google.common.cache.Cache;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -30,13 +31,17 @@ import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.junit.Assert;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import static org.apache.hadoop.hdds.scm
.ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
@@ -44,19 +49,32 @@ import static org.apache.hadoop.hdds.scm
/**
* Test for XceiverClientManager caching and eviction.
*/
+@RunWith(Parameterized.class)
public class TestXceiverClientManager {
private static OzoneConfiguration config;
private static MiniOzoneCluster cluster;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static String containerOwner = "OZONE";
+ private static boolean shouldUseGrpc;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> withGrpc() {
+ return Arrays.asList(new Object[][] {{false}, {true}});
+ }
+
+ public TestXceiverClientManager(boolean useGrpc) {
+ shouldUseGrpc = useGrpc;
+ }
@Rule
public ExpectedException exception = ExpectedException.none();
- @BeforeClass
- public static void init() throws Exception {
+ @Before
+ public void init() throws Exception {
config = new OzoneConfiguration();
+ config.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+ shouldUseGrpc);
cluster = MiniOzoneCluster.newBuilder(config)
.setNumDatanodes(3)
.build();
@@ -65,8 +83,8 @@ public class TestXceiverClientManager {
.getStorageContainerLocationClient();
}
- @AfterClass
- public static void shutdown() {
+ @After
+ public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
@@ -76,6 +94,8 @@ public class TestXceiverClientManager {
@Test
public void testCaching() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+ shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerInfo container1 = storageContainerLocationClient
@@ -106,6 +126,8 @@ public class TestXceiverClientManager {
public void testFreeByReference() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
+ conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+ shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<Long, XceiverClientSpi> cache =
clientManager.getClientCache();
@@ -140,10 +162,18 @@ public class TestXceiverClientManager {
// After releasing the client, this connection should be closed
// and any container operations should fail
clientManager.releaseClient(client1);
- exception.expect(IOException.class);
- exception.expectMessage("This channel is not connected.");
- ContainerProtocolCalls.createContainer(client1,
- container1.getContainerID(), traceID1);
+
+ String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" :
+ "This channel is not connected.";
+ try {
+ ContainerProtocolCalls.createContainer(client1,
+ container1.getContainerID(), traceID1);
+ Assert.fail("Create container should throw exception on closed"
+ + "client");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getClass(), IOException.class);
+ Assert.assertTrue(e.getMessage().contains(expectedMessage));
+ }
clientManager.releaseClient(client2);
}
@@ -151,6 +181,8 @@ public class TestXceiverClientManager {
public void testFreeByEviction() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
+ conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+ shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<Long, XceiverClientSpi> cache =
clientManager.getClientCache();
@@ -181,10 +213,17 @@ public class TestXceiverClientManager {
// Any container operation should now fail
String traceID2 = "trace" + RandomStringUtils.randomNumeric(4);
- exception.expect(IOException.class);
- exception.expectMessage("This channel is not connected.");
- ContainerProtocolCalls.createContainer(client1,
- container1.getContainerID(), traceID2);
+ String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" :
+ "This channel is not connected.";
+ try {
+ ContainerProtocolCalls.createContainer(client1,
+ container1.getContainerID(), traceID2);
+ Assert.fail("Create container should throw exception on closed"
+ + "client");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getClass(), IOException.class);
+ Assert.assertTrue(e.getMessage().contains(expectedMessage));
+ }
clientManager.releaseClient(client2);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index a916108..73c3f5b 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -107,6 +107,7 @@
<!-- Maven protoc compiler -->
<protobuf-maven-plugin.version>0.5.1</protobuf-maven-plugin.version>
<protobuf-compile.version>3.5.0</protobuf-compile.version>
+ <grpc.version>1.10.0</grpc.version>
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
<!-- define the Java language version used by the compiler -->
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[03/18] hadoop git commit: YARN-8290.
SystemMetricsPublisher.appACLsUpdated should be invoked after application
information is published to ATS to avoid "User is not set in the application
report" Exception. (Eric Yang via wangda)
Posted by ha...@apache.org.
YARN-8290. SystemMetricsPublisher.appACLsUpdated should be invoked after application information is published to ATS to avoid "User is not set in the application report" Exception. (Eric Yang via wangda)
Change-Id: I0ac6ddd19740d1aa7dd07111cd11af71ddc2fcaf
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bd15d239
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bd15d239
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bd15d239
Branch: refs/heads/HDDS-48
Commit: bd15d2396ef0c24fb6b60c6393d16b37651b828e
Parents: 523f602
Author: Wangda Tan <wa...@apache.org>
Authored: Tue May 22 13:25:15 2018 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Tue May 22 13:33:33 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/yarn/server/resourcemanager/RMAppManager.java | 5 -----
.../hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java | 5 +++++
.../hadoop/yarn/server/resourcemanager/TestAppManager.java | 4 ----
.../hadoop/yarn/server/resourcemanager/TestRMRestart.java | 1 +
4 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd15d239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 2983077..3e64cfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
@@ -466,10 +465,6 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
- String appViewACLs = submissionContext.getAMContainerSpec()
- .getApplicationACLs().get(ApplicationAccessType.VIEW_APP);
- rmContext.getSystemMetricsPublisher().appACLsUpdated(
- application, appViewACLs, System.currentTimeMillis());
return application;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd15d239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index daf14c4..6aee813 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -2020,6 +2021,10 @@ public class RMAppImpl implements RMApp, Recoverable {
private void sendATSCreateEvent() {
rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
rmContext.getSystemMetricsPublisher().appCreated(this, this.startTime);
+ String appViewACLs = submissionContext.getAMContainerSpec()
+ .getApplicationACLs().get(ApplicationAccessType.VIEW_APP);
+ rmContext.getSystemMetricsPublisher().appACLsUpdated(
+ this, appViewACLs, System.currentTimeMillis());
}
@Private
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd15d239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index f6cdfec..e79ba08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -856,9 +856,6 @@ public class TestAppManager{
Assert.assertNotNull("app is null", app);
Assert.assertEquals("app id doesn't match", appId, app.getApplicationId());
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
- verify(metricsPublisher).appACLsUpdated(
- any(RMApp.class), any(String.class), anyLong());
-
// wait for event to be processed
int timeoutSecs = 0;
while ((getAppEventType() == RMAppEventType.KILL) &&
@@ -867,7 +864,6 @@ public class TestAppManager{
}
Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
getAppEventType());
-
return app;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd15d239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index a4f18ee..07c5268 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -766,6 +766,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId());
rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+ Assert.assertEquals(app0.getUser(), loadedApp0.getUser());
// no new attempt is created.
Assert.assertEquals(1, loadedApp0.getAppAttempts().size());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[05/18] hadoop git commit: HDDS-89. Addendum Patch-1. Create ozone
specific inline documentation as part of the build. Contributed by Elek,
Marton.
Posted by ha...@apache.org.
HDDS-89. Addendum Patch-1. Create ozone specific inline documentation as part of the build.
Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/43be9ab4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/43be9ab4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/43be9ab4
Branch: refs/heads/HDDS-48
Commit: 43be9ab44f27ae847e100efdc6810b192202fc55
Parents: b22f56c
Author: Anu Engineer <ae...@apache.org>
Authored: Tue May 22 14:29:06 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue May 22 14:29:06 2018 -0700
----------------------------------------------------------------------
hadoop-ozone/docs/dev-support/bin/generate-site.sh | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43be9ab4/hadoop-ozone/docs/dev-support/bin/generate-site.sh
----------------------------------------------------------------------
diff --git a/hadoop-ozone/docs/dev-support/bin/generate-site.sh b/hadoop-ozone/docs/dev-support/bin/generate-site.sh
index 3323935..374e74b 100755
--- a/hadoop-ozone/docs/dev-support/bin/generate-site.sh
+++ b/hadoop-ozone/docs/dev-support/bin/generate-site.sh
@@ -19,7 +19,7 @@ DOCDIR="$DIR/../.."
if [ ! "$(which hugo)" ]; then
echo "Hugo is not yet installed. Doc generation is skipped."
- exit -1
+ exit 0
fi
DESTDIR="$DOCDIR/target/classes/webapps/docs"
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[16/18] hadoop git commit: HDDS-44. Ozone: start-ozone.sh fail to
start datanode because of incomplete classpaths. Contributed by Mukul Kumar
Singh.
Posted by ha...@apache.org.
HDDS-44. Ozone: start-ozone.sh fail to start datanode because of incomplete classpaths.
Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e83b943f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e83b943f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e83b943f
Branch: refs/heads/HDDS-48
Commit: e83b943fed53c8082a699e0601c2f8e8db0f8ffe
Parents: 63fc587
Author: Anu Engineer <ae...@apache.org>
Authored: Wed May 23 09:29:35 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Wed May 23 09:29:35 2018 -0700
----------------------------------------------------------------------
hadoop-ozone/common/src/main/bin/start-ozone.sh | 116 ++++++++++++++++++-
1 file changed, 111 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e83b943f/hadoop-ozone/common/src/main/bin/start-ozone.sh
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/bin/start-ozone.sh b/hadoop-ozone/common/src/main/bin/start-ozone.sh
index dda0a1c..92bc4a8 100644
--- a/hadoop-ozone/common/src/main/bin/start-ozone.sh
+++ b/hadoop-ozone/common/src/main/bin/start-ozone.sh
@@ -47,6 +47,26 @@ else
exit 1
fi
+# get arguments
+if [[ $# -ge 1 ]]; then
+ startOpt="$1"
+ shift
+ case "$startOpt" in
+ -upgrade)
+ nameStartOpt="$startOpt"
+ ;;
+ -rollback)
+ dataStartOpt="$startOpt"
+ ;;
+ *)
+ hadoop_exit_with_usage 1
+ ;;
+ esac
+fi
+
+#Add other possible options
+nameStartOpt="$nameStartOpt $*"
+
SECURITY_ENABLED=$("${HADOOP_HDFS_HOME}/bin/ozone" getozoneconf -confKey hadoop.security.authentication | tr '[:upper:]' '[:lower:]' 2>&-)
SECURITY_AUTHORIZATION_ENABLED=$("${HADOOP_HDFS_HOME}/bin/ozone" getozoneconf -confKey hadoop.security.authorization | tr '[:upper:]' '[:lower:]' 2>&-)
@@ -65,11 +85,97 @@ fi
#---------------------------------------------------------
# Start hdfs before starting ozone daemons
-if [[ -f "${bin}/start-dfs.sh" ]]; then
- "${bin}/start-dfs.sh"
-else
- echo "ERROR: Cannot execute ${bin}/start-dfs.sh." 2>&1
- exit 1
+
+#---------------------------------------------------------
+# namenodes
+
+NAMENODES=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -namenodes 2>/dev/null)
+
+if [[ -z "${NAMENODES}" ]]; then
+ NAMENODES=$(hostname)
+fi
+
+echo "Starting namenodes on [${NAMENODES}]"
+hadoop_uservar_su hdfs namenode "${HADOOP_HDFS_HOME}/bin/hdfs" \
+ --workers \
+ --config "${HADOOP_CONF_DIR}" \
+ --hostnames "${NAMENODES}" \
+ --daemon start \
+ namenode ${nameStartOpt}
+
+HADOOP_JUMBO_RETCOUNTER=$?
+
+#---------------------------------------------------------
+# datanodes (using default workers file)
+
+echo "Starting datanodes"
+hadoop_uservar_su hdfs datanode "${HADOOP_HDFS_HOME}/bin/ozone" \
+ --workers \
+ --config "${HADOOP_CONF_DIR}" \
+ --daemon start \
+ datanode ${dataStartOpt}
+(( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))
+
+#---------------------------------------------------------
+# secondary namenodes (if any)
+
+SECONDARY_NAMENODES=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -secondarynamenodes 2>/dev/null)
+
+if [[ -n "${SECONDARY_NAMENODES}" ]]; then
+
+ if [[ "${NAMENODES}" =~ , ]]; then
+
+ hadoop_error "WARNING: Highly available NameNode is configured."
+ hadoop_error "WARNING: Skipping SecondaryNameNode."
+
+ else
+
+ if [[ "${SECONDARY_NAMENODES}" == "0.0.0.0" ]]; then
+ SECONDARY_NAMENODES=$(hostname)
+ fi
+
+ echo "Starting secondary namenodes [${SECONDARY_NAMENODES}]"
+
+ hadoop_uservar_su hdfs secondarynamenode "${HADOOP_HDFS_HOME}/bin/hdfs" \
+ --workers \
+ --config "${HADOOP_CONF_DIR}" \
+ --hostnames "${SECONDARY_NAMENODES}" \
+ --daemon start \
+ secondarynamenode
+ (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))
+ fi
+fi
+
+#---------------------------------------------------------
+# quorumjournal nodes (if any)
+
+JOURNAL_NODES=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -journalNodes 2>&-)
+
+if [[ "${#JOURNAL_NODES}" != 0 ]]; then
+ echo "Starting journal nodes [${JOURNAL_NODES}]"
+
+ hadoop_uservar_su hdfs journalnode "${HADOOP_HDFS_HOME}/bin/hdfs" \
+ --workers \
+ --config "${HADOOP_CONF_DIR}" \
+ --hostnames "${JOURNAL_NODES}" \
+ --daemon start \
+ journalnode
+ (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))
+fi
+
+#---------------------------------------------------------
+# ZK Failover controllers, if auto-HA is enabled
+AUTOHA_ENABLED=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey dfs.ha.automatic-failover.enabled | tr '[:upper:]' '[:lower:]')
+if [[ "${AUTOHA_ENABLED}" = "true" ]]; then
+ echo "Starting ZK Failover Controllers on NN hosts [${NAMENODES}]"
+
+ hadoop_uservar_su hdfs zkfc "${HADOOP_HDFS_HOME}/bin/hdfs" \
+ --workers \
+ --config "${HADOOP_CONF_DIR}" \
+ --hostnames "${NAMENODES}" \
+ --daemon start \
+ zkfc
+ (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))
fi
#---------------------------------------------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[04/18] hadoop git commit: YARN-8273. Log aggregation does not warn
if HDFS quota in target directory is exceeded (grepas via rkanter)
Posted by ha...@apache.org.
YARN-8273. Log aggregation does not warn if HDFS quota in target directory is exceeded (grepas via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b22f56c4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b22f56c4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b22f56c4
Branch: refs/heads/HDDS-48
Commit: b22f56c4719e63bd4f6edc2a075e0bcdb9442255
Parents: 83f53e5
Author: Robert Kanter <rk...@apache.org>
Authored: Tue May 22 14:24:38 2018 -0700
Committer: Robert Kanter <rk...@apache.org>
Committed: Tue May 22 14:24:38 2018 -0700
----------------------------------------------------------------------
.../hadoop-yarn/hadoop-yarn-common/pom.xml | 4 ++
.../logaggregation/AggregatedLogFormat.java | 14 +++-
.../LogAggregationDFSException.java | 45 ++++++++++++
.../LogAggregationFileController.java | 4 +-
.../tfile/LogAggregationTFileController.java | 13 +++-
.../logaggregation/TestContainerLogsUtils.java | 4 +-
.../logaggregation/AppLogAggregatorImpl.java | 49 ++++++++++---
.../TestAppLogAggregatorImpl.java | 75 +++++++++++++++++---
.../nodemanager/webapp/TestNMWebServices.java | 7 +-
9 files changed, 183 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index db6c11a..a25c524 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -40,6 +40,10 @@
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index af3066e..81d5053 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Writable;
@@ -547,7 +548,7 @@ public class AggregatedLogFormat {
}
@Override
- public void close() {
+ public void close() throws DSQuotaExceededException {
try {
if (writer != null) {
writer.close();
@@ -555,7 +556,16 @@ public class AggregatedLogFormat {
} catch (Exception e) {
LOG.warn("Exception closing writer", e);
} finally {
- IOUtils.cleanupWithLogger(LOG, this.fsDataOStream);
+ try {
+ this.fsDataOStream.close();
+ } catch (DSQuotaExceededException e) {
+ LOG.error("Exception in closing {}",
+ this.fsDataOStream.getClass(), e);
+ throw e;
+ } catch (Throwable e) {
+ LOG.error("Exception in closing {}",
+ this.fsDataOStream.getClass(), e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java
new file mode 100644
index 0000000..19953e4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationDFSException.java
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * This exception class indicates an issue during log aggregation.
+ */
+public class LogAggregationDFSException extends YarnException {
+
+ private static final long serialVersionUID = -6691549081090183145L;
+
+ public LogAggregationDFSException() {
+ }
+
+ public LogAggregationDFSException(String message) {
+ super(message);
+ }
+
+ public LogAggregationDFSException(Throwable cause) {
+ super(cause);
+ }
+
+ public LogAggregationDFSException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index 5ac89e9..d342e3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -162,8 +162,10 @@ public abstract class LogAggregationFileController {
/**
* Close the writer.
+ * @throws LogAggregationDFSException if the closing of the writer fails
+ * (for example due to HDFS quota being exceeded)
*/
- public abstract void closeWriter();
+ public abstract void closeWriter() throws LogAggregationDFSException;
/**
* Write the log content.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
index a4f50d2..e87af7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
@@ -95,10 +97,15 @@ public class LogAggregationTFileController
}
@Override
- public void closeWriter() {
+ public void closeWriter() throws LogAggregationDFSException {
if (this.writer != null) {
- this.writer.close();
- this.writer = null;
+ try {
+ this.writer.close();
+ } catch (DSQuotaExceededException e) {
+ throw new LogAggregationDFSException(e);
+ } finally {
+ this.writer = null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
index a12e2a1..4767282 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
@@ -65,7 +65,7 @@ public final class TestContainerLogsUtils {
public static void createContainerLogFileInRemoteFS(Configuration conf,
FileSystem fs, String rootLogDir, ContainerId containerId, NodeId nodeId,
String fileName, String user, String content,
- boolean deleteRemoteLogDir) throws IOException {
+ boolean deleteRemoteLogDir) throws Exception {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
//prepare the logs for remote directory
ApplicationId appId = containerId.getApplicationAttemptId()
@@ -113,7 +113,7 @@ public final class TestContainerLogsUtils {
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
- ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
+ ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
LogAggregationFileControllerFactory factory
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index c7e06ff..5956823 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
@@ -263,7 +264,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
return params;
}
- private void uploadLogsForContainers(boolean appFinished) {
+ private void uploadLogsForContainers(boolean appFinished)
+ throws LogAggregationDFSException {
if (this.logAggregationDisabled) {
return;
}
@@ -301,6 +303,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
logAggregationTimes++;
String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
+ DeletionTask deletionTask = null;
try {
try {
logAggregationFileController.initializeWriter(logControllerContext);
@@ -327,10 +330,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
uploadedLogsInThisCycle = true;
List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
- DeletionTask deletionTask = new FileDeletionTask(delService,
+ deletionTask = new FileDeletionTask(delService,
this.userUgi.getShortUserName(), null,
uploadedFilePathsInThisCycleList);
- delService.delete(deletionTask);
}
// This container is finished, and all its logs have been uploaded,
@@ -356,9 +358,23 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
logAggregationSucceedInThisCycle = false;
}
} finally {
+ LogAggregationDFSException exc = null;
+ try {
+ this.logAggregationFileController.closeWriter();
+ } catch (LogAggregationDFSException e) {
+ diagnosticMessage = e.getMessage();
+ renameTemporaryLogFileFailed = true;
+ logAggregationSucceedInThisCycle = false;
+ exc = e;
+ }
+ if (logAggregationSucceedInThisCycle && deletionTask != null) {
+ delService.delete(deletionTask);
+ }
sendLogAggregationReport(logAggregationSucceedInThisCycle,
diagnosticMessage, appFinished);
- logAggregationFileController.closeWriter();
+ if (exc != null) {
+ throw exc;
+ }
}
}
@@ -413,13 +429,18 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
diagnosticMessage, finalized);
}
- @SuppressWarnings("unchecked")
@Override
public void run() {
try {
doAppLogAggregation();
+ } catch (LogAggregationDFSException e) {
+ // if the log aggregation could not be performed due to DFS issues
+ // let's not clean up the log files, since that can result in
+ // loss of logs
+ LOG.error("Error occurred while aggregating the log for the application "
+ + appId, e);
} catch (Exception e) {
- // do post clean up of log directories on any exception
+ // do post clean up of log directories on any other exception
LOG.error("Error occurred while aggregating the log for the application "
+ appId, e);
doAppLogAggregationPostCleanUp();
@@ -434,8 +455,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
}
- @SuppressWarnings("unchecked")
- private void doAppLogAggregation() {
+ private void doAppLogAggregation() throws LogAggregationDFSException {
while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) {
try {
@@ -452,6 +472,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
} catch (InterruptedException e) {
LOG.warn("PendingContainers queue is interrupted");
this.appFinishing.set(true);
+ } catch (LogAggregationDFSException e) {
+ this.appFinishing.set(true);
+ throw e;
}
}
}
@@ -460,10 +483,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
return;
}
- // App is finished, upload the container logs.
- uploadLogsForContainers(true);
+ try {
+ // App is finished, upload the container logs.
+ uploadLogsForContainers(true);
- doAppLogAggregationPostCleanUp();
+ doAppLogAggregationPostCleanUp();
+ } catch (LogAggregationDFSException e) {
+ LOG.error("Error during log aggregation", e);
+ }
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
index e13c805..95f4c32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -42,7 +43,9 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -52,12 +55,14 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -228,10 +233,15 @@ public class TestAppLogAggregatorImpl {
config.setLong(
YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs);
+ LogAggregationTFileController format = spy(
+ new LogAggregationTFileController());
+ format.initialize(config, "TFile");
+
+ Context context = createContext(config);
final AppLogAggregatorInTest appLogAggregator =
createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
- config, recoveredLogInitedTimeMillis,
- deletionServiceWithExpectedFiles);
+ config, context, recoveredLogInitedTimeMillis,
+ deletionServiceWithExpectedFiles, format);
appLogAggregator.startContainerLogAggregation(
new ContainerLogContext(containerId, ContainerType.TASK, 0));
// set app finished flag first
@@ -269,8 +279,10 @@ public class TestAppLogAggregatorImpl {
private static AppLogAggregatorInTest createAppLogAggregator(
ApplicationId applicationId, String rootLogDir,
- YarnConfiguration config, long recoveredLogInitedTimeMillis,
- DeletionService deletionServiceWithFilesToExpect)
+ YarnConfiguration config, Context context,
+ long recoveredLogInitedTimeMillis,
+ DeletionService deletionServiceWithFilesToExpect,
+ LogAggregationTFileController tFileController)
throws IOException {
final Dispatcher dispatcher = createNullDispatcher();
@@ -284,16 +296,12 @@ public class TestAppLogAggregatorImpl {
final LogAggregationContext logAggregationContext = null;
final Map<ApplicationAccessType, String> appAcls = new HashMap<>();
- final Context context = createContext(config);
final FileContext fakeLfs = mock(FileContext.class);
final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath());
- LogAggregationTFileController format = spy(
- new LogAggregationTFileController());
- format.initialize(config, "TFile");
return new AppLogAggregatorInTest(dispatcher, deletionService,
config, applicationId, ugi, nodeId, dirsService,
remoteLogDirForApp, appAcls, logAggregationContext,
- context, fakeLfs, recoveredLogInitedTimeMillis, format);
+ context, fakeLfs, recoveredLogInitedTimeMillis, tFileController);
}
/**
@@ -423,4 +431,53 @@ public class TestAppLogAggregatorImpl {
this.logValue = ArgumentCaptor.forClass(LogValue.class);
}
}
+
+ @Test
+ public void testDFSQuotaExceeded() throws Exception {
+
+ // the expectation is that no log files are deleted if the quota has
+ // been exceeded, since that would result in loss of logs
+ DeletionService deletionServiceWithExpectedFiles =
+ createDeletionServiceWithExpectedFile2Delete(Collections.emptySet());
+
+ final YarnConfiguration config = new YarnConfiguration();
+
+ ApplicationId appId = ApplicationId.newInstance(1357543L, 1);
+
+ // we need a LogAggregationTFileController that throws a
+ // LogAggregationDFSException
+ LogAggregationTFileController format =
+ Mockito.mock(LogAggregationTFileController.class);
+ Mockito.doThrow(new LogAggregationDFSException())
+ .when(format).closeWriter();
+
+ NodeManager.NMContext context = (NMContext) createContext(config);
+ context.setNMLogAggregationStatusTracker(
+ Mockito.mock(NMLogAggregationStatusTracker.class));
+
+ final AppLogAggregatorInTest appLogAggregator =
+ createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
+ config, context, 1000L, deletionServiceWithExpectedFiles, format);
+
+ appLogAggregator.startContainerLogAggregation(
+ new ContainerLogContext(
+ ContainerId.newContainerId(
+ ApplicationAttemptId.newInstance(appId, 0), 0),
+ ContainerType.TASK, 0));
+ // set app finished flag first
+ appLogAggregator.finishLogAggregation();
+ appLogAggregator.run();
+
+ // verify that no files have been uploaded
+ ArgumentCaptor<LogValue> logValCaptor =
+ ArgumentCaptor.forClass(LogValue.class);
+ verify(appLogAggregator.getLogAggregationFileController()).write(
+ any(LogKey.class), logValCaptor.capture());
+ Set<String> filesUploaded = new HashSet<>();
+ LogValue logValue = logValCaptor.getValue();
+ for (File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
+ filesUploaded.add(file.getAbsolutePath());
+ }
+ verifyFilesUploaded(filesUploaded, Collections.emptySet());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b22f56c4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index 39e403d..dbd980b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -87,7 +87,6 @@ import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.File;
-import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import java.net.HttpURLConnection;
@@ -356,7 +355,7 @@ public class TestNMWebServices extends JerseyTestBase {
}
@Test (timeout = 5000)
- public void testContainerLogsWithNewAPI() throws IOException, JSONException{
+ public void testContainerLogsWithNewAPI() throws Exception {
final ContainerId containerId = BuilderUtils.newContainerId(0, 0, 0, 0);
WebResource r = resource();
r = r.path("ws").path("v1").path("node").path("containers")
@@ -365,7 +364,7 @@ public class TestNMWebServices extends JerseyTestBase {
}
@Test (timeout = 5000)
- public void testContainerLogsWithOldAPI() throws IOException, JSONException{
+ public void testContainerLogsWithOldAPI() throws Exception {
final ContainerId containerId = BuilderUtils.newContainerId(1, 1, 0, 1);
WebResource r = resource();
r = r.path("ws").path("v1").path("node").path("containerlogs")
@@ -538,7 +537,7 @@ public class TestNMWebServices extends JerseyTestBase {
}
private void testContainerLogs(WebResource r, ContainerId containerId)
- throws IOException {
+ throws Exception {
final String containerIdStr = containerId.toString();
final ApplicationAttemptId appAttemptId = containerId
.getApplicationAttemptId();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[14/18] hadoop git commit: YARN-8297. Incorrect ATS Url used for Wire
encrypted cluster.(addendum). Contributed by Sunil G.
Posted by ha...@apache.org.
YARN-8297. Incorrect ATS Url used for Wire encrypted cluster.(addendum). Contributed by Sunil G.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f61e3e75
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f61e3e75
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f61e3e75
Branch: refs/heads/HDDS-48
Commit: f61e3e752eb1cf4a08030da04bc3d6c5a2b3926d
Parents: 9837ca9
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Wed May 23 18:31:03 2018 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Wed May 23 18:31:03 2018 +0530
----------------------------------------------------------------------
.../src/main/webapp/app/initializers/loader.js | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f61e3e75/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-ui/src/main/webapp/app/initializers/loader.js
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-ui/src/main/webapp/app/initializers/loader.js b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-ui/src/main/webapp/app/initializers/loader.js
index 53f9c44..01daa7a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-ui/src/main/webapp/app/initializers/loader.js
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-ui/src/main/webapp/app/initializers/loader.js
@@ -31,7 +31,7 @@ function getYarnHttpProtocolScheme(rmhost, application) {
$.ajax({
type: 'GET',
dataType: 'json',
- async: true,
+ async: false,
context: this,
url: httpUrl,
success: function(data) {
@@ -44,7 +44,7 @@ function getYarnHttpProtocolScheme(rmhost, application) {
application.advanceReadiness();
}
});
- return protocolScheme == "HTTPS_ONLY";
+ return protocolScheme;
}
function getTimeLineURL(rmhost, isHttpsSchemeEnabled) {
@@ -97,7 +97,9 @@ function updateConfigs(application) {
Ember.Logger.log("RM Address: " + rmhost);
- var isHttpsSchemeEnabled = getYarnHttpProtocolScheme(rmhost, application);
+ var protocolSchemeFromRM = getYarnHttpProtocolScheme(rmhost, application);
+ Ember.Logger.log("Is protocol scheme https? " + (protocolSchemeFromRM == "HTTPS_ONLY"));
+ var isHttpsSchemeEnabled = (protocolSchemeFromRM == "HTTPS_ONLY");
if(!ENV.hosts.timelineWebAddress) {
var timelinehost = "";
$.ajax({
@@ -137,7 +139,7 @@ function updateConfigs(application) {
$.ajax({
type: 'GET',
dataType: 'json',
- async: true,
+ async: false,
context: this,
url: getTimeLineV1URL(rmhost, isHttpsSchemeEnabled),
success: function(data) {
@@ -171,7 +173,7 @@ function updateConfigs(application) {
$.ajax({
type: 'GET',
dataType: 'json',
- async: true,
+ async: false,
context: this,
url: getSecurityURL(rmhost),
success: function(data) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[06/18] hadoop git commit: HDDS-79. Remove ReportState from
SCMHeartbeatRequestProto. Contributed by Nanda kumar.
Posted by ha...@apache.org.
HDDS-79. Remove ReportState from SCMHeartbeatRequestProto. Contributed by Nanda kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/68c7fd8e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/68c7fd8e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/68c7fd8e
Branch: refs/heads/HDDS-48
Commit: 68c7fd8e6092e8436ecf96852c608708f311f262
Parents: 43be9ab
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Tue May 22 15:46:59 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Tue May 22 15:46:59 2018 -0700
----------------------------------------------------------------------
.../common/impl/ContainerManagerImpl.java | 14 +---
.../common/impl/ContainerReportManagerImpl.java | 43 +++---------
.../common/interfaces/ContainerManager.java | 7 --
.../interfaces/ContainerReportManager.java | 8 +--
.../statemachine/DatanodeStateMachine.java | 1 -
.../common/statemachine/StateContext.java | 38 ----------
.../states/endpoint/HeartbeatEndpointTask.java | 3 +-
.../container/ozoneimpl/OzoneContainer.java | 9 ---
.../StorageContainerDatanodeProtocol.java | 5 +-
.../protocol/StorageContainerNodeProtocol.java | 5 +-
...rDatanodeProtocolClientSideTranslatorPB.java | 5 +-
...rDatanodeProtocolServerSideTranslatorPB.java | 3 +-
.../StorageContainerDatanodeProtocol.proto | 39 -----------
.../ozone/container/common/ScmTestMock.java | 13 +---
.../common/TestDatanodeStateMachine.java | 7 --
.../hdds/scm/node/HeartbeatQueueItem.java | 23 +-----
.../hadoop/hdds/scm/node/SCMNodeManager.java | 30 +-------
.../scm/server/SCMDatanodeProtocolServer.java | 6 +-
.../hdds/scm/container/MockNodeManager.java | 5 +-
.../hdds/scm/node/TestContainerPlacement.java | 9 +--
.../hadoop/hdds/scm/node/TestNodeManager.java | 74 +++++++++-----------
.../ozone/container/common/TestEndPoint.java | 11 +--
.../testutils/ReplicationNodeManagerMock.java | 5 +-
.../ozone/TestStorageContainerManager.java | 5 +-
24 files changed, 63 insertions(+), 305 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
index 3a78c70..faee5d0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
@@ -1072,16 +1070,8 @@ public class ContainerManagerImpl implements ContainerManager {
@Override
public long getNumKeys(long containerId) {
ContainerData cData = containerMap.get(containerId);
- return cData.getKeyCount(); }
-
- /**
- * Get the container report state to send via HB to SCM.
- *
- * @return container report state.
- */
- @Override
- public ReportState getContainerReportState() {
- return containerReportManager.getContainerReportState();
+ return cData.getKeyCount();
}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java
index 6c83c66..f1d3f7f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java
@@ -19,15 +19,12 @@ package org.apache.hadoop.ozone.container.common.impl;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces
.ContainerReportManager;
import org.apache.hadoop.util.Time;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval;
@@ -40,15 +37,9 @@ public class ContainerReportManagerImpl implements ContainerReportManager {
private long lastContainerReportTime;
private final long containerReportInterval;
private final long heartbeatInterval;
- private AtomicLong reportCount;
- private static final ReportState NO_CONTAINER_REPORTSTATE =
- ReportState.newBuilder()
- .setState(ReportState.states.noContainerReports)
- .setCount(0).build();
public ContainerReportManagerImpl(Configuration config) {
this.lastContainerReportTime = -1;
- this.reportCount = new AtomicLong(0L);
this.containerReportInterval = config.getTimeDuration(
OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL,
OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT,
@@ -56,32 +47,18 @@ public class ContainerReportManagerImpl implements ContainerReportManager {
this.heartbeatInterval = getScmHeartbeatInterval(config);
}
- public ReportState getContainerReportState() {
+ public boolean shouldSendContainerReport() {
if (lastContainerReportTime < 0) {
- return getFullContainerReportState();
- } else {
- // Add a random delay (0~30s) on top of the container report
- // interval (60s) so tha the SCM is overwhelmed by the container reports
- // sent in sync.
- if (Time.monotonicNow() - lastContainerReportTime >
- (containerReportInterval + getRandomReportDelay())) {
- return getFullContainerReportState();
- } else {
- return getNoContainerReportState();
- }
+ return true;
}
- }
-
- private ReportState getFullContainerReportState() {
- ReportState.Builder rsBuilder = ReportState.newBuilder();
- rsBuilder.setState(ReportState.states.completeContinerReport);
- rsBuilder.setCount(reportCount.incrementAndGet());
- this.lastContainerReportTime = Time.monotonicNow();
- return rsBuilder.build();
- }
-
- private ReportState getNoContainerReportState() {
- return NO_CONTAINER_REPORTSTATE;
+ // Add a random delay (0~30s) on top of the container report
+ // interval (60s) so tha the SCM is overwhelmed by the container reports
+ // sent in sync.
+ if (Time.monotonicNow() - lastContainerReportTime >
+ (containerReportInterval + getRandomReportDelay())) {
+ return true;
+ }
+ return false;
}
private long getRandomReportDelay() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
index 84d95f8..3a1a73d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
@@ -29,8 +29,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
@@ -266,9 +264,4 @@ public interface ContainerManager extends RwLock {
*/
long getNumKeys(long containerId);
- /**
- * Get the container report state to send via HB to SCM.
- * @return container report state.
- */
- ReportState getContainerReportState();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java
index 4689dfe..6d7557b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java
@@ -17,16 +17,14 @@
*/
package org.apache.hadoop.ozone.container.common.interfaces;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
-
/**
* Interface for container report manager operations.
*/
public interface ContainerReportManager {
/**
- * Get the container report state.
- * @return the container report state.
+ * Check if we have to send container report.
+ * @return true if container report has to be sent.
*/
- ReportState getContainerReportState();
+ boolean shouldSendContainerReport();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index ef1ba59..a16bfdc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -135,7 +135,6 @@ public class DatanodeStateMachine implements Closeable {
LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
nextHB.set(Time.monotonicNow() + heartbeatFrequency);
context.setReportState(container.getNodeReport());
- context.setContainerReportState(container.getContainerReportState());
context.execute(executorService, heartbeatFrequency,
TimeUnit.MILLISECONDS);
now = Time.monotonicNow();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 55476fd..27eb57e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -18,8 +18,6 @@ package org.apache.hadoop.ozone.container.common.statemachine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
@@ -40,9 +38,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState.states
- .noContainerReports;
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
/**
@@ -58,9 +53,6 @@ public class StateContext {
private final Configuration conf;
private DatanodeStateMachine.DatanodeStates state;
private SCMNodeReport nrState;
- private ReportState reportState;
- private static final ReportState DEFAULT_REPORT_STATE =
- ReportState.newBuilder().setState(noContainerReports).setCount(0).build();
/**
* Constructs a StateContext.
@@ -212,7 +204,6 @@ public class StateContext {
if (isExiting(newState)) {
task.onExit();
}
- this.clearReportState();
this.setState(newState);
}
}
@@ -253,33 +244,4 @@ public class StateContext {
return stateExecutionCount.get();
}
-
- /**
- * Gets the ReportState.
- * @return ReportState.
- */
- public synchronized ReportState getContainerReportState() {
- if (reportState == null) {
- return DEFAULT_REPORT_STATE;
- }
- return reportState;
- }
-
- /**
- * Sets the ReportState.
- * @param rState - ReportState.
- */
- public synchronized void setContainerReportState(ReportState rState) {
- this.reportState = rState;
- }
-
- /**
- * Clears report state after it has been communicated.
- */
- public synchronized void clearReportState() {
- if(reportState != null) {
- setContainerReportState(null);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index eba565d..2f1db39 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -99,8 +99,7 @@ public class HeartbeatEndpointTask
Preconditions.checkState(this.datanodeDetailsProto != null);
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
- .sendHeartbeat(datanodeDetailsProto, this.context.getNodeReport(),
- this.context.getContainerReportState());
+ .sendHeartbeat(datanodeDetailsProto, this.context.getNodeReport());
processResponse(reponse, datanodeDetailsProto);
rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
rpcEndpoint.zeroMissedCount();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 33a5971..1fc79d7 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -25,8 +25,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
@@ -267,11 +265,4 @@ public class OzoneContainer {
return this.manager;
}
- /**
- * Get the container report state to send via HB to SCM.
- * @return the container report state.
- */
- public ReportState getContainerReportState() {
- return this.manager.getContainerReportState();
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
index 8aa397b..e2a3bf5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
@@ -59,12 +57,11 @@ public interface StorageContainerDatanodeProtocol {
* Used by data node to send a Heartbeat.
* @param datanodeDetails - Datanode Details.
* @param nodeReport - node report state
- * @param reportState - container report state.
* @return - SCMHeartbeatResponseProto
* @throws IOException
*/
SCMHeartbeatResponseProto sendHeartbeat(DatanodeDetailsProto datanodeDetails,
- SCMNodeReport nodeReport, ReportState reportState) throws IOException;
+ SCMNodeReport nodeReport) throws IOException;
/**
* Register Datanode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
index 5d1d434..14038fb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.ozone.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
@@ -61,10 +59,9 @@ public interface StorageContainerNodeProtocol {
* Send heartbeat to indicate the datanode is alive and doing well.
* @param datanodeDetails - Datanode ID.
* @param nodeReport - node report.
- * @param reportState - container report.
* @return SCMheartbeat response list
*/
List<SCMCommand> sendHeartbeat(DatanodeDetailsProto datanodeDetails,
- SCMNodeReport nodeReport, ReportState reportState);
+ SCMNodeReport nodeReport);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
index 95d4cb0..a56c57a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
@@ -29,8 +29,6 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
@@ -133,12 +131,11 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
@Override
public SCMHeartbeatResponseProto sendHeartbeat(
DatanodeDetailsProto datanodeDetailsProto,
- SCMNodeReport nodeReport, ReportState reportState) throws IOException {
+ SCMNodeReport nodeReport) throws IOException {
SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto
.newBuilder();
req.setDatanodeDetails(datanodeDetailsProto);
req.setNodeReport(nodeReport);
- req.setContainerReportState(reportState);
final SCMHeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index 139f04c..07dba57 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -88,8 +88,7 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
SCMHeartbeatRequestProto request) throws ServiceException {
try {
return impl.sendHeartbeat(request.getDatanodeDetails(),
- request.getNodeReport(),
- request.getContainerReportState());
+ request.getNodeReport());
} catch (IOException e) {
throw new ServiceException(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 2b34d11..91070b3 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -42,45 +42,6 @@ import "hdds.proto";
message SCMHeartbeatRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
optional SCMNodeReport nodeReport = 2;
- optional ReportState containerReportState = 3;
-}
-
-enum DatanodeContainerState {
- closed = 0;
- open = 1;
-}
-
-/**
-NodeState contains messages from datanode to SCM saying that it has
-some information that SCM might be interested in.*/
-message ReportState {
- enum states {
- noContainerReports = 0;
- completeContinerReport = 1;
- deltaContainerReport = 2;
- }
- required states state = 1;
- required int64 count = 2 [default = 0];
-}
-
-
-/**
-This message is used to persist the information about a container in the
-SCM database, This information allows SCM to startup faster and avoid having
-all container info in memory all the time.
- */
-message ContainerPersistanceProto {
- required DatanodeContainerState state = 1;
- required hadoop.hdds.Pipeline pipeline = 2;
- required ContainerInfo info = 3;
-}
-
-/**
-This message is used to do a quick look up of which containers are effected
-if a node goes down
-*/
-message NodeContianerMapping {
- repeated string contianerName = 1;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index 948367a..c57a366 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -30,8 +30,6 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
@@ -53,7 +51,6 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
private int rpcResponseDelay;
private AtomicInteger heartbeatCount = new AtomicInteger(0);
private AtomicInteger rpcCount = new AtomicInteger(0);
- private ReportState reportState;
private AtomicInteger containerReportsCount = new AtomicInteger(0);
// Map of datanode to containers
@@ -177,11 +174,10 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
@Override
public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
sendHeartbeat(DatanodeDetailsProto datanodeDetailsProto,
- SCMNodeReport nodeReport, ReportState scmReportState)
+ SCMNodeReport nodeReport)
throws IOException {
rpcCount.incrementAndGet();
heartbeatCount.incrementAndGet();
- this.reportState = scmReportState;
sleepIfNeeded();
List<SCMCommandResponseProto>
cmdResponses = new LinkedList<>();
@@ -298,19 +294,12 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
.newBuilder().getDefaultInstanceForType();
}
- public ReportState getReportState() {
- return this.reportState;
- }
-
/**
* Reset the mock Scm for test to get a fresh start without rebuild MockScm.
*/
public void reset() {
heartbeatCount.set(0);
rpcCount.set(0);
- reportState = ReportState.newBuilder()
- .setState(ReportState.states.noContainerReports)
- .setCount(0).build();
containerReportsCount.set(0);
nodeContainers.clear();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index 9a9aab1..ee82c57 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -21,8 +21,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
@@ -305,11 +303,6 @@ public class TestDatanodeStateMachine {
for (ScmTestMock mock : mockServers) {
Assert.assertEquals(1, mock.getHeartbeatCount());
- // Assert that heartbeat did indeed carry that State that we said
- // have in the datanode.
- Assert.assertEquals(mock.getReportState().getState().getNumber(),
- StorageContainerDatanodeProtocolProtos.ReportState.states
- .noContainerReports.getNumber());
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
index 43720f0..05a9fc3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/HeartbeatQueueItem.java
@@ -21,8 +21,6 @@ package org.apache.hadoop.hdds.scm.node;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import static org.apache.hadoop.util.Time.monotonicNow;
@@ -34,21 +32,18 @@ public class HeartbeatQueueItem {
private DatanodeDetails datanodeDetails;
private long recvTimestamp;
private SCMNodeReport nodeReport;
- private ReportState containerReportState;
/**
*
* @param datanodeDetails - datanode ID of the heartbeat.
* @param recvTimestamp - heartbeat receive timestamp.
* @param nodeReport - node report associated with the heartbeat if any.
- * @param containerReportState - container report state.
*/
HeartbeatQueueItem(DatanodeDetails datanodeDetails, long recvTimestamp,
- SCMNodeReport nodeReport, ReportState containerReportState) {
+ SCMNodeReport nodeReport) {
this.datanodeDetails = datanodeDetails;
this.recvTimestamp = recvTimestamp;
this.nodeReport = nodeReport;
- this.containerReportState = containerReportState;
}
/**
@@ -66,13 +61,6 @@ public class HeartbeatQueueItem {
}
/**
- * @return container report state.
- */
- public ReportState getContainerReportState() {
- return containerReportState;
- }
-
- /**
* @return heartbeat receive timestamp.
*/
public long getRecvTimestamp() {
@@ -85,7 +73,6 @@ public class HeartbeatQueueItem {
public static class Builder {
private DatanodeDetails datanodeDetails;
private SCMNodeReport nodeReport;
- private ReportState containerReportState;
private long recvTimestamp = monotonicNow();
public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
@@ -98,11 +85,6 @@ public class HeartbeatQueueItem {
return this;
}
- public Builder setContainerReportState(ReportState crs) {
- this.containerReportState = crs;
- return this;
- }
-
@VisibleForTesting
public Builder setRecvTimestamp(long recvTime) {
this.recvTimestamp = recvTime;
@@ -110,8 +92,7 @@ public class HeartbeatQueueItem {
}
public HeartbeatQueueItem build() {
- return new HeartbeatQueueItem(datanodeDetails, recvTimestamp, nodeReport,
- containerReportState);
+ return new HeartbeatQueueItem(datanodeDetails, recvTimestamp, nodeReport);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index cf1d8a5..353a069 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -31,8 +31,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
@@ -48,7 +46,6 @@ import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
@@ -609,8 +606,6 @@ public class SCMNodeManager
if (healthyNodes.containsKey(datanodeUuid)) {
healthyNodes.put(datanodeUuid, processTimestamp);
updateNodeStat(datanodeUuid, nodeReport);
- updateCommandQueue(datanodeUuid,
- hbItem.getContainerReportState().getState());
return;
}
@@ -622,8 +617,6 @@ public class SCMNodeManager
healthyNodeCount.incrementAndGet();
staleNodeCount.decrementAndGet();
updateNodeStat(datanodeUuid, nodeReport);
- updateCommandQueue(datanodeUuid,
- hbItem.getContainerReportState().getState());
return;
}
@@ -635,8 +628,6 @@ public class SCMNodeManager
deadNodeCount.decrementAndGet();
healthyNodeCount.incrementAndGet();
updateNodeStat(datanodeUuid, nodeReport);
- updateCommandQueue(datanodeUuid,
- hbItem.getContainerReportState().getState());
return;
}
@@ -671,22 +662,6 @@ public class SCMNodeManager
}
}
- private void updateCommandQueue(UUID dnId,
- ReportState.states containerReportState) {
- if (containerReportState != null) {
- switch (containerReportState) {
- case completeContinerReport:
- commandQueue.addCommand(dnId,
- SendContainerCommand.newBuilder().build());
- return;
- case deltaContainerReport:
- case noContainerReports:
- default:
- // do nothing
- }
- }
- }
-
/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.
@@ -829,14 +804,12 @@ public class SCMNodeManager
*
* @param datanodeDetailsProto - DatanodeDetailsProto.
* @param nodeReport - node report.
- * @param containerReportState - container report state.
* @return SCMheartbeat response.
* @throws IOException
*/
@Override
public List<SCMCommand> sendHeartbeat(
- DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport,
- ReportState containerReportState) {
+ DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport) {
Preconditions.checkNotNull(datanodeDetailsProto, "Heartbeat is missing " +
"DatanodeDetails.");
@@ -851,7 +824,6 @@ public class SCMNodeManager
new HeartbeatQueueItem.Builder()
.setDatanodeDetails(datanodeDetails)
.setNodeReport(nodeReport)
- .setContainerReportState(containerReportState)
.build());
return commandQueue.getCommand(datanodeDetails.getUuid());
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 91ed032..58b8c82 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -153,12 +153,10 @@ public class SCMDatanodeProtocolServer implements
@Override
public SCMHeartbeatResponseProto sendHeartbeat(
HddsProtos.DatanodeDetailsProto datanodeDetails,
- StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport,
- StorageContainerDatanodeProtocolProtos.ReportState reportState)
+ StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport)
throws IOException {
List<SCMCommand> commands =
- scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport,
- reportState);
+ scm.getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport);
List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
for (SCMCommand cmd : commands) {
cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid()));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index d8b8b5e..a46d7ba 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -24,8 +24,6 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
@@ -387,13 +385,12 @@ public class MockNodeManager implements NodeManager {
*
* @param datanodeDetails - Datanode ID.
* @param nodeReport - node report.
- * @param containerReportState - container report state.
* @return SCMheartbeat response list
*/
@Override
public List<SCMCommand> sendHeartbeat(
HddsProtos.DatanodeDetailsProto datanodeDetails,
- SCMNodeReport nodeReport, ReportState containerReportState) {
+ SCMNodeReport nodeReport) {
if ((datanodeDetails != null) && (nodeReport != null) && (nodeReport
.getStorageReportCount() > 0)) {
SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 321e4e2..09b6cd1 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
@@ -34,8 +33,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -70,10 +67,6 @@ public class TestContainerPlacement {
private static XceiverClientManager xceiverClientManager =
new XceiverClientManager(new OzoneConfiguration());
- private ReportState reportState = ReportState.newBuilder()
- .setState(ReportState.states.noContainerReports)
- .setCount(0).build();
-
/**
* Returns a new copy of Configuration.
*
@@ -143,7 +136,7 @@ public class TestContainerPlacement {
List<SCMStorageReport> reports = TestUtils
.createStorageReport(capacity, used, remaining, path, null, id, 1);
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
- TestUtils.createNodeReport(reports), reportState);
+ TestUtils.createNodeReport(reports));
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
index 9fe38ce..36e796f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
@@ -26,10 +26,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -82,10 +78,6 @@ public class TestNodeManager {
private File testDir;
- private ReportState reportState = ReportState.newBuilder()
- .setState(ReportState.states.noContainerReports)
- .setCount(0).build();
-
@Rule
public ExpectedException thrown = ExpectedException.none();
@@ -153,7 +145,7 @@ public class TestNodeManager {
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
nodeManager);
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
- null, reportState);
+ null);
}
// Wait for 4 seconds max.
@@ -200,8 +192,7 @@ public class TestNodeManager {
// Need 100 nodes to come out of chill mode, only one node is sending HB.
nodeManager.setMinimumChillModeNodes(100);
nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager)
- .getProtoBufMessage(),
- null, reportState);
+ .getProtoBufMessage(), null);
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
assertFalse("Not enough heartbeat, Node manager should have" +
@@ -229,7 +220,7 @@ public class TestNodeManager {
// Send 10 heartbeat from same node, and assert we never leave chill mode.
for (int x = 0; x < 10; x++) {
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
- null, reportState);
+ null);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
@@ -260,7 +251,7 @@ public class TestNodeManager {
// These should never be processed.
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
- null, reportState);
+ null);
// Let us just wait for 2 seconds to prove that HBs are not processed.
Thread.sleep(2 * 1000);
@@ -289,8 +280,7 @@ public class TestNodeManager {
nodemanager.register(datanodeDetails.getProtoBufMessage(),
TestUtils.createNodeReport(reports));
List<SCMCommand> command = nodemanager.sendHeartbeat(
- datanodeDetails.getProtoBufMessage(),
- null, reportState);
+ datanodeDetails.getProtoBufMessage(), null);
Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails));
Assert.assertTrue("On regular HB calls, SCM responses a "
+ "datanode with an empty command list", command.isEmpty());
@@ -309,7 +299,7 @@ public class TestNodeManager {
@Override public Boolean get() {
List<SCMCommand> command =
nodemanager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
- null, reportState);
+ null);
return command.size() == 1 && command.get(0).getType()
.equals(SCMCmdType.reregisterCommand);
}
@@ -341,7 +331,7 @@ public class TestNodeManager {
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
nodeManager);
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
- null, reportState);
+ null);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
@@ -433,18 +423,18 @@ public class TestNodeManager {
// Heartbeat once
nodeManager.sendHeartbeat(staleNode.getProtoBufMessage(),
- null, reportState);
+ null);
// Heartbeat all other nodes.
for (DatanodeDetails dn : nodeList) {
- nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
+ nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
}
// Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000);
for (DatanodeDetails dn : nodeList) {
- nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
+ nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
}
// Wait for 2 seconds, wait a total of 4 seconds to make sure that the
@@ -461,7 +451,7 @@ public class TestNodeManager {
// heartbeat good nodes again.
for (DatanodeDetails dn : nodeList) {
- nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
+ nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
}
// 6 seconds is the dead window for this test , so we wait a total of
@@ -497,7 +487,7 @@ public class TestNodeManager {
public void testScmCheckForErrorOnNullDatanodeDetails() throws IOException,
InterruptedException, TimeoutException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
- nodeManager.sendHeartbeat(null, null, reportState);
+ nodeManager.sendHeartbeat(null, null);
} catch (NullPointerException npe) {
GenericTestUtils.assertExceptionContains("Heartbeat is missing " +
"DatanodeDetails.", npe);
@@ -575,11 +565,11 @@ public class TestNodeManager {
DatanodeDetails deadNode =
TestUtils.getDatanodeDetails(nodeManager);
nodeManager.sendHeartbeat(
- healthyNode.getProtoBufMessage(), null, reportState);
+ healthyNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat(
- staleNode.getProtoBufMessage(), null, reportState);
+ staleNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat(
- deadNode.getProtoBufMessage(), null, reportState);
+ deadNode.getProtoBufMessage(), null);
// Sleep so that heartbeat processing thread gets to run.
Thread.sleep(500);
@@ -606,15 +596,15 @@ public class TestNodeManager {
*/
nodeManager.sendHeartbeat(
- healthyNode.getProtoBufMessage(), null, reportState);
+ healthyNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat(
- staleNode.getProtoBufMessage(), null, reportState);
+ staleNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat(
- deadNode.getProtoBufMessage(), null, reportState);
+ deadNode.getProtoBufMessage(), null);
Thread.sleep(1500);
nodeManager.sendHeartbeat(
- healthyNode.getProtoBufMessage(), null, reportState);
+ healthyNode.getProtoBufMessage(), null);
Thread.sleep(2 * 1000);
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
@@ -635,12 +625,12 @@ public class TestNodeManager {
*/
nodeManager.sendHeartbeat(
- healthyNode.getProtoBufMessage(), null, reportState);
+ healthyNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat(
- staleNode.getProtoBufMessage(), null, reportState);
+ staleNode.getProtoBufMessage(), null);
Thread.sleep(1500);
nodeManager.sendHeartbeat(
- healthyNode.getProtoBufMessage(), null, reportState);
+ healthyNode.getProtoBufMessage(), null);
Thread.sleep(2 * 1000);
// 3.5 seconds have elapsed for stale node, so it moves into Stale.
@@ -674,11 +664,11 @@ public class TestNodeManager {
* back all the nodes in healthy state.
*/
nodeManager.sendHeartbeat(
- healthyNode.getProtoBufMessage(), null, reportState);
+ healthyNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat(
- staleNode.getProtoBufMessage(), null, reportState);
+ staleNode.getProtoBufMessage(), null);
nodeManager.sendHeartbeat(
- deadNode.getProtoBufMessage(), null, reportState);
+ deadNode.getProtoBufMessage(), null);
Thread.sleep(500);
//Assert all nodes are healthy.
assertEquals(3, nodeManager.getAllNodes().size());
@@ -699,7 +689,7 @@ public class TestNodeManager {
int sleepDuration) throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
for (DatanodeDetails dn : list) {
- manager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
+ manager.sendHeartbeat(dn.getProtoBufMessage(), null);
}
Thread.sleep(sleepDuration);
}
@@ -785,7 +775,7 @@ public class TestNodeManager {
// No Thread just one time HBs the node manager, so that these will be
// marked as dead nodes eventually.
for (DatanodeDetails dn : deadNodeList) {
- nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
+ nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
}
@@ -950,7 +940,7 @@ public class TestNodeManager {
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
nodeManager);
nodeManager.sendHeartbeat(
- datanodeDetails.getProtoBufMessage(), null, reportState);
+ datanodeDetails.getProtoBufMessage(), null);
String status = nodeManager.getChillModeStatus();
Assert.assertThat(status, containsString("Still in chill " +
"mode, waiting on nodes to report in."));
@@ -978,7 +968,7 @@ public class TestNodeManager {
for (int x = 0; x < 20; x++) {
DatanodeDetails datanode = TestUtils.getDatanodeDetails(nodeManager);
nodeManager.sendHeartbeat(datanode.getProtoBufMessage(),
- null, reportState);
+ null);
}
Thread.sleep(500);
@@ -1023,7 +1013,7 @@ public class TestNodeManager {
.createStorageReport(capacity, used, free, storagePath,
null, dnId, 1);
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
- TestUtils.createNodeReport(reports), reportState);
+ TestUtils.createNodeReport(reports));
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
@@ -1073,7 +1063,7 @@ public class TestNodeManager {
null, dnId, 1);
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
- TestUtils.createNodeReport(reports), reportState);
+ TestUtils.createNodeReport(reports));
Thread.sleep(100);
}
@@ -1154,7 +1144,7 @@ public class TestNodeManager {
.createStorageReport(capacity, expectedScmUsed, expectedRemaining,
storagePath, null, dnId, 1);
nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
- TestUtils.createNodeReport(reports), reportState);
+ TestUtils.createNodeReport(reports));
// Wait up to 5 seconds so that the dead node becomes healthy
// Verify usage info should be updated.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 9ac1467..e82dc98 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -71,9 +71,6 @@ import java.util.UUID;
import static org.apache.hadoop.hdds.scm.TestUtils.getDatanodeDetails;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-import static org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState.states
- .noContainerReports;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils
.createEndpoint;
@@ -88,8 +85,6 @@ public class TestEndPoint {
private static RPC.Server scmServer;
private static ScmTestMock scmServerImpl;
private static File testDir;
- private static StorageContainerDatanodeProtocolProtos.ReportState
- defaultReportState;
@AfterClass
public static void tearDown() throws Exception {
@@ -106,9 +101,6 @@ public class TestEndPoint {
scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
scmServerImpl, serverAddress, 10);
testDir = PathUtils.getTestDir(TestEndPoint.class);
- defaultReportState = StorageContainerDatanodeProtocolProtos.
- ReportState.newBuilder().setState(noContainerReports).
- setCount(0).build();
}
@Test
@@ -305,8 +297,7 @@ public class TestEndPoint {
String storageId = UUID.randomUUID().toString();
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
.sendHeartbeat(dataNode.getProtoBufMessage(),
- TestUtils.createNodeReport(getStorageReports(storageId)),
- defaultReportState);
+ TestUtils.createNodeReport(getStorageReports(storageId)));
Assert.assertNotNull(responseProto);
Assert.assertEquals(0, responseProto.getCommandsCount());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index b49b71b..3f814d0 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -27,8 +27,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
@@ -293,12 +291,11 @@ public class ReplicationNodeManagerMock implements NodeManager {
*
* @param dd - Datanode Details.
* @param nodeReport - node report.
- * @param containerReportState - container report state.
* @return SCMheartbeat response list
*/
@Override
public List<SCMCommand> sendHeartbeat(HddsProtos.DatanodeDetailsProto dd,
- SCMNodeReport nodeReport, ReportState containerReportState) {
+ SCMNodeReport nodeReport) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c7fd8e/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 0081f0d..a0d41a8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
@@ -302,11 +301,9 @@ public class TestStorageContainerManager {
GenericTestUtils.waitFor(() -> {
NodeManager nodeManager = cluster.getStorageContainerManager()
.getScmNodeManager();
- ReportState reportState = ReportState.newBuilder()
- .setState(ReportState.states.noContainerReports).setCount(0).build();
List<SCMCommand> commands = nodeManager.sendHeartbeat(
nodeManager.getNodes(NodeState.HEALTHY).get(0).getProtoBufMessage(),
- null, reportState);
+ null);
if (commands != null) {
for (SCMCommand cmd : commands) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[11/18] hadoop git commit: HDDS-85. Send Container State Info while
sending the container report from Datanode to SCM. Contributed by Shashikant
Banerjee.
Posted by ha...@apache.org.
HDDS-85. Send Container State Info while sending the container report from Datanode to SCM. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fed2bef6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fed2bef6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fed2bef6
Branch: refs/heads/HDDS-48
Commit: fed2bef647d9a15fe020ad5d3bb89fcb77ed30e6
Parents: 745f203
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Wed May 23 14:15:35 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Wed May 23 14:15:35 2018 +0530
----------------------------------------------------------------------
.../main/proto/DatanodeContainerProtocol.proto | 1 +
.../container/common/helpers/ContainerData.java | 8 ++++
.../common/impl/ContainerManagerImpl.java | 45 ++++++++++++++++++--
.../common/interfaces/ContainerManager.java | 2 +-
.../commandhandler/ContainerReportHandler.java | 4 +-
.../container/ozoneimpl/OzoneContainer.java | 4 +-
.../common/impl/TestContainerPersistence.java | 2 +-
7 files changed, 57 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fed2bef6/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 1138297..53da18a 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -131,6 +131,7 @@ enum Result {
UNCLOSED_CONTAINER_IO = 25;
DELETE_ON_OPEN_CONTAINER = 26;
CLOSED_CONTAINER_RETRY = 27;
+ INVALID_CONTAINER_STATE = 28;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fed2bef6/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
index 14ee33a..d1746f2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
@@ -340,6 +340,14 @@ public class ContainerData {
}
/**
+ * checks if the container is closed.
+ * @return - boolean
+ */
+ public synchronized boolean isClosed() {
+ return ContainerLifeCycleState.CLOSED == state;
+ }
+
+ /**
* Marks this container as closed.
*/
public synchronized void closeContainer() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fed2bef6/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
index faee5d0..9355364 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -100,6 +102,8 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.UNCLOSED_CONTAINER_IO;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.UNSUPPORTED_REQUEST;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
+ Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
/**
@@ -707,6 +711,39 @@ public class ContainerManagerImpl implements ContainerManager {
}
/**
+ * Returns LifeCycle State of the container
+ * @param containerID - Id of the container
+ * @return LifeCycle State of the container
+ * @throws StorageContainerException
+ */
+ private HddsProtos.LifeCycleState getState(long containerID)
+ throws StorageContainerException {
+ LifeCycleState state;
+ final ContainerData data = containerMap.get(containerID);
+ if (data == null) {
+ throw new StorageContainerException(
+ "Container status not found: " + containerID, CONTAINER_NOT_FOUND);
+ }
+ switch (data.getState()) {
+ case OPEN:
+ state = LifeCycleState.OPEN;
+ break;
+ case CLOSING:
+ state = LifeCycleState.CLOSING;
+ break;
+ case CLOSED:
+ state = LifeCycleState.CLOSED;
+ break;
+ default:
+ throw new StorageContainerException(
+ "Invalid Container state found: " + containerID,
+ INVALID_CONTAINER_STATE);
+ }
+
+ return state;
+ }
+
+ /**
* Supports clean shutdown of container.
*
* @throws IOException
@@ -835,14 +872,14 @@ public class ContainerManagerImpl implements ContainerManager {
* @throws IOException
*/
@Override
- public List<ContainerData> getContainerReports() throws IOException {
+ public List<ContainerData> getClosedContainerReports() throws IOException {
LOG.debug("Starting container report iteration.");
// No need for locking since containerMap is a ConcurrentSkipListMap
// And we can never get the exact state since close might happen
// after we iterate a point.
return containerMap.entrySet().stream()
.filter(containerData ->
- !containerData.getValue().isOpen())
+ containerData.getValue().isClosed())
.map(containerData -> containerData.getValue())
.collect(Collectors.toList());
}
@@ -870,6 +907,7 @@ public class ContainerManagerImpl implements ContainerManager {
.setType(ContainerReportsRequestProto.reportType.fullReport);
for (ContainerData container: containers) {
+ long containerId = container.getContainerID();
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
ciBuilder.setContainerID(container.getContainerID())
@@ -879,7 +917,8 @@ public class ContainerManagerImpl implements ContainerManager {
.setReadCount(container.getReadCount())
.setWriteCount(container.getWriteCount())
.setReadBytes(container.getReadBytes())
- .setWriteBytes(container.getWriteBytes());
+ .setWriteBytes(container.getWriteBytes())
+ .setState(getState(containerId));
crBuilder.addReports(ciBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fed2bef6/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
index 3a1a73d..ba70953 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
@@ -185,7 +185,7 @@ public interface ContainerManager extends RwLock {
* @return List of all closed containers.
* @throws IOException
*/
- List<ContainerData> getContainerReports() throws IOException;
+ List<ContainerData> getClosedContainerReports() throws IOException;
/**
* Increase pending deletion blocks count number of specified container.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fed2bef6/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
index ba6b418..fbea290 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
@@ -63,13 +63,13 @@ public class ContainerReportHandler implements CommandHandler {
invocationCount++;
long startTime = Time.monotonicNow();
try {
- ContainerReportsRequestProto contianerReport =
+ ContainerReportsRequestProto containerReport =
container.getContainerReport();
// TODO : We send this report to all SCMs.Check if it is enough only to
// send to the leader once we have RAFT enabled SCMs.
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
- endPoint.getEndPoint().sendContainerReport(contianerReport);
+ endPoint.getEndPoint().sendContainerReport(containerReport);
}
} catch (IOException ex) {
LOG.error("Unable to process the Container Report command.", ex);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fed2bef6/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index b497cdc..6758479 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -265,8 +265,8 @@ public class OzoneContainer {
* @return - List of closed containers.
* @throws IOException
*/
- public List<ContainerData> getContainerReports() throws IOException {
- return this.manager.getContainerReports();
+ public List<ContainerData> getClosedContainerReports() throws IOException {
+ return this.manager.getClosedContainerReports();
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fed2bef6/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 89ee673..4975fd3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -307,7 +307,7 @@ public class TestContainerPersistence {
}
// The container report only returns reports of closed containers.
- List<ContainerData> reports = containerManager.getContainerReports();
+ List<ContainerData> reports = containerManager.getClosedContainerReports();
Assert.assertEquals(4, reports.size());
for(ContainerData report : reports) {
long actualContainerID = report.getContainerID();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[17/18] hadoop git commit: HDFS-13588. Fix TestFsDatasetImpl test
failures on Windows. Contributed by Xiao Liang.
Posted by ha...@apache.org.
HDFS-13588. Fix TestFsDatasetImpl test failures on Windows. Contributed by Xiao Liang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c0c9b7a8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c0c9b7a8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c0c9b7a8
Branch: refs/heads/HDDS-48
Commit: c0c9b7a8ef2618b7641a0452d9277abd26815de2
Parents: e83b943
Author: Inigo Goiri <in...@apache.org>
Authored: Wed May 23 09:46:35 2018 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Wed May 23 09:46:35 2018 -0700
----------------------------------------------------------------------
.../server/datanode/fsdataset/impl/TestFsDatasetImpl.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0c9b7a8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index d684950..9270be8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
@@ -666,7 +667,8 @@ public class TestFsDatasetImpl {
TimeUnit.MILLISECONDS);
config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
- cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+ cluster = new MiniDFSCluster.Builder(config,
+ GenericTestUtils.getRandomizedTestDir()).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
DataNode dataNode = cluster.getDataNodes().get(0);
@@ -688,7 +690,7 @@ public class TestFsDatasetImpl {
// Remove write and execute access so that checkDiskErrorThread detects
// this volume is bad.
finalizedDir.setExecutable(false);
- finalizedDir.setWritable(false);
+ assertTrue(FileUtil.setWritable(finalizedDir, false));
}
Assert.assertTrue("Reference count for the volume should be greater "
+ "than 0", volume.getReferenceCount() > 0);
@@ -709,7 +711,7 @@ public class TestFsDatasetImpl {
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(info.getXferAddr(), ioe);
}
- finalizedDir.setWritable(true);
+ assertTrue(FileUtil.setWritable(finalizedDir, true));
finalizedDir.setExecutable(true);
} finally {
cluster.shutdown();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[10/18] hadoop git commit: Additional check when unpacking archives.
Contributed by Jason Lowe and Akira Ajisaka.
Posted by ha...@apache.org.
Additional check when unpacking archives. Contributed by Jason Lowe and Akira Ajisaka.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/745f203e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/745f203e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/745f203e
Branch: refs/heads/HDDS-48
Commit: 745f203e577bacb35b042206db94615141fa5e6f
Parents: 1d2640b
Author: Akira Ajisaka <aa...@apache.org>
Authored: Wed May 23 17:15:57 2018 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Wed May 23 17:16:23 2018 +0900
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/FileUtil.java | 18 ++++++++-
.../java/org/apache/hadoop/fs/TestFileUtil.java | 40 +++++++++++++++++---
2 files changed, 51 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/745f203e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
index 8743be5..5ef78f2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
@@ -617,11 +617,16 @@ public class FileUtil {
throws IOException {
try (ZipInputStream zip = new ZipInputStream(inputStream)) {
int numOfFailedLastModifiedSet = 0;
+ String targetDirPath = toDir.getCanonicalPath() + File.separator;
for(ZipEntry entry = zip.getNextEntry();
entry != null;
entry = zip.getNextEntry()) {
if (!entry.isDirectory()) {
File file = new File(toDir, entry.getName());
+ if (!file.getCanonicalPath().startsWith(targetDirPath)) {
+ throw new IOException("expanding " + entry.getName()
+ + " would create file outside of " + toDir);
+ }
File parent = file.getParentFile();
if (!parent.mkdirs() &&
!parent.isDirectory()) {
@@ -656,12 +661,17 @@ public class FileUtil {
try {
entries = zipFile.entries();
+ String targetDirPath = unzipDir.getCanonicalPath() + File.separator;
while (entries.hasMoreElements()) {
ZipEntry entry = entries.nextElement();
if (!entry.isDirectory()) {
InputStream in = zipFile.getInputStream(entry);
try {
File file = new File(unzipDir, entry.getName());
+ if (!file.getCanonicalPath().startsWith(targetDirPath)) {
+ throw new IOException("expanding " + entry.getName()
+ + " would create file outside of " + unzipDir);
+ }
if (!file.getParentFile().mkdirs()) {
if (!file.getParentFile().isDirectory()) {
throw new IOException("Mkdirs failed to create " +
@@ -944,6 +954,13 @@ public class FileUtil {
private static void unpackEntries(TarArchiveInputStream tis,
TarArchiveEntry entry, File outputDir) throws IOException {
+ String targetDirPath = outputDir.getCanonicalPath() + File.separator;
+ File outputFile = new File(outputDir, entry.getName());
+ if (!outputFile.getCanonicalPath().startsWith(targetDirPath)) {
+ throw new IOException("expanding " + entry.getName()
+ + " would create entry outside of " + outputDir);
+ }
+
if (entry.isDirectory()) {
File subDir = new File(outputDir, entry.getName());
if (!subDir.mkdirs() && !subDir.isDirectory()) {
@@ -966,7 +983,6 @@ public class FileUtil {
return;
}
- File outputFile = new File(outputDir, entry.getName());
if (!outputFile.getParentFile().exists()) {
if (!outputFile.getParentFile().mkdirs()) {
throw new IOException("Mkdirs failed to create tar internal dir "
http://git-wip-us.apache.org/repos/asf/hadoop/blob/745f203e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileUtil.java
index 39f2f6b..7218a1b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileUtil.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileUtil.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,6 +39,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.util.ArrayList;
@@ -685,10 +687,8 @@ public class TestFileUtil {
@Test (timeout = 30000)
public void testUnZip() throws IOException {
- // make sa simple zip
setupDirs();
-
- // make a simple tar:
+ // make a simple zip
final File simpleZip = new File(del, FILE);
OutputStream os = new FileOutputStream(simpleZip);
ZipOutputStream tos = new ZipOutputStream(os);
@@ -705,7 +705,7 @@ public class TestFileUtil {
tos.close();
}
- // successfully untar it into an existing dir:
+ // successfully unzip it into an existing dir:
FileUtil.unZip(simpleZip, tmp);
// check result:
assertTrue(new File(tmp, "foo").exists());
@@ -720,8 +720,36 @@ public class TestFileUtil {
} catch (IOException ioe) {
// okay
}
- }
-
+ }
+
+ @Test (timeout = 30000)
+ public void testUnZip2() throws IOException {
+ setupDirs();
+ // make a simple zip
+ final File simpleZip = new File(del, FILE);
+ OutputStream os = new FileOutputStream(simpleZip);
+ try (ZipOutputStream tos = new ZipOutputStream(os)) {
+ // Add an entry that contains invalid filename
+ ZipEntry ze = new ZipEntry("../foo");
+ byte[] data = "some-content".getBytes(StandardCharsets.UTF_8);
+ ze.setSize(data.length);
+ tos.putNextEntry(ze);
+ tos.write(data);
+ tos.closeEntry();
+ tos.flush();
+ tos.finish();
+ }
+
+ // Unzip it into an existing dir
+ try {
+ FileUtil.unZip(simpleZip, tmp);
+ fail("unZip should throw IOException.");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains(
+ "would create file outside of", e);
+ }
+ }
+
@Test (timeout = 30000)
/*
* Test method copy(FileSystem srcFS, Path src, File dst, boolean deleteSource, Configuration conf)
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[09/18] hadoop git commit: HDFS-13601. Optimize ByteString
conversions in PBHelper.
Posted by ha...@apache.org.
HDFS-13601. Optimize ByteString conversions in PBHelper.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1d2640b6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1d2640b6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1d2640b6
Branch: refs/heads/HDDS-48
Commit: 1d2640b6132e8308c07476badd2d1482be68a298
Parents: 5a91406
Author: Andrew Wang <wa...@apache.org>
Authored: Tue May 22 23:55:20 2018 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Tue May 22 23:55:20 2018 -0700
----------------------------------------------------------------------
.../dev-support/findbugsExcludeFile.xml | 5 ++
.../apache/hadoop/hdfs/protocol/DatanodeID.java | 50 +++++++++++++--
.../hadoop/hdfs/protocolPB/PBHelperClient.java | 67 +++++++++++++++++---
.../TestDataXceiverBackwardsCompat.java | 10 +++
4 files changed, 118 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1d2640b6/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
index 8e2bc94..fa9654b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
@@ -91,5 +91,10 @@
<Method name="getSymlinkInBytes" />
<Bug pattern="EI_EXPOSE_REP" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.protocolPB.PBHelperClient" />
+ <Method name="getFixedByteString" />
+ <Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION" />
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1d2640b6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
index af720c7..718661e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.protocol;
+import com.google.protobuf.ByteString;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -44,7 +45,9 @@ public class DatanodeID implements Comparable<DatanodeID> {
"null", "null", 0, 0, 0, 0);
private String ipAddr; // IP address
+ private ByteString ipAddrBytes; // ipAddr ByteString to save on PB serde
private String hostName; // hostname claimed by datanode
+ private ByteString hostNameBytes; // hostName ByteString to save on PB serde
private String peerHostName; // hostname from the actual connection
private int xferPort; // data streaming port
private int infoPort; // info server port
@@ -58,6 +61,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
* For newly formatted Datanodes it is a UUID.
*/
private final String datanodeUuid;
+ // datanodeUuid ByteString to save on PB serde
+ private final ByteString datanodeUuidBytes;
public DatanodeID(DatanodeID from) {
this(from.getDatanodeUuid(), from);
@@ -66,8 +71,11 @@ public class DatanodeID implements Comparable<DatanodeID> {
@VisibleForTesting
public DatanodeID(String datanodeUuid, DatanodeID from) {
this(from.getIpAddr(),
+ from.getIpAddrBytes(),
from.getHostName(),
+ from.getHostNameBytes(),
datanodeUuid,
+ getByteString(datanodeUuid),
from.getXferPort(),
from.getInfoPort(),
from.getInfoSecurePort(),
@@ -89,22 +97,43 @@ public class DatanodeID implements Comparable<DatanodeID> {
*/
public DatanodeID(String ipAddr, String hostName, String datanodeUuid,
int xferPort, int infoPort, int infoSecurePort, int ipcPort) {
- setIpAndXferPort(ipAddr, xferPort);
+ this(ipAddr, getByteString(ipAddr),
+ hostName, getByteString(hostName),
+ datanodeUuid, getByteString(datanodeUuid),
+ xferPort, infoPort, infoSecurePort, ipcPort);
+ }
+
+ private DatanodeID(String ipAddr, ByteString ipAddrBytes,
+ String hostName, ByteString hostNameBytes,
+ String datanodeUuid, ByteString datanodeUuidBytes,
+ int xferPort, int infoPort, int infoSecurePort, int ipcPort) {
+ setIpAndXferPort(ipAddr, ipAddrBytes, xferPort);
this.hostName = hostName;
+ this.hostNameBytes = hostNameBytes;
this.datanodeUuid = checkDatanodeUuid(datanodeUuid);
+ this.datanodeUuidBytes = datanodeUuidBytes;
this.infoPort = infoPort;
this.infoSecurePort = infoSecurePort;
this.ipcPort = ipcPort;
}
+ private static ByteString getByteString(String str) {
+ if (str != null) {
+ return ByteString.copyFromUtf8(str);
+ }
+ return ByteString.EMPTY;
+ }
+
public void setIpAddr(String ipAddr) {
//updated during registration, preserve former xferPort
- setIpAndXferPort(ipAddr, xferPort);
+ setIpAndXferPort(ipAddr, getByteString(ipAddr), xferPort);
}
- private void setIpAndXferPort(String ipAddr, int xferPort) {
+ private void setIpAndXferPort(String ipAddr, ByteString ipAddrBytes,
+ int xferPort) {
// build xferAddr string to reduce cost of frequent use
this.ipAddr = ipAddr;
+ this.ipAddrBytes = ipAddrBytes;
this.xferPort = xferPort;
this.xferAddr = ipAddr + ":" + xferPort;
}
@@ -120,6 +149,10 @@ public class DatanodeID implements Comparable<DatanodeID> {
return datanodeUuid;
}
+ public ByteString getDatanodeUuidBytes() {
+ return datanodeUuidBytes;
+ }
+
private String checkDatanodeUuid(String uuid) {
if (uuid == null || uuid.isEmpty()) {
return null;
@@ -135,6 +168,10 @@ public class DatanodeID implements Comparable<DatanodeID> {
return ipAddr;
}
+ public ByteString getIpAddrBytes() {
+ return ipAddrBytes;
+ }
+
/**
* @return hostname
*/
@@ -142,6 +179,10 @@ public class DatanodeID implements Comparable<DatanodeID> {
return hostName;
}
+ public ByteString getHostNameBytes() {
+ return hostNameBytes;
+ }
+
/**
* @return hostname from the actual connection
*/
@@ -258,7 +299,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
* Note that this does not update storageID.
*/
public void updateRegInfo(DatanodeID nodeReg) {
- setIpAndXferPort(nodeReg.getIpAddr(), nodeReg.getXferPort());
+ setIpAndXferPort(nodeReg.getIpAddr(), nodeReg.getIpAddrBytes(),
+ nodeReg.getXferPort());
hostName = nodeReg.getHostName();
peerHostName = nodeReg.getPeerHostName();
infoPort = nodeReg.getInfoPort();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1d2640b6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index ff9733c..579ac43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -27,8 +27,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.primitives.Shorts;
import com.google.protobuf.ByteString;
@@ -228,6 +232,49 @@ public class PBHelperClient {
private static final FsAction[] FSACTION_VALUES =
FsAction.values();
+ /**
+ * Map used to cache fixed strings to ByteStrings. Since there is no
+ * automatic expiration policy, only use this for strings from a fixed, small
+ * set.
+ * <p/>
+ * This map should not be accessed directly. Used the getFixedByteString
+ * methods instead.
+ */
+ private static ConcurrentHashMap<Object, ByteString> fixedByteStringCache =
+ new ConcurrentHashMap<>();
+
+ private static ByteString getFixedByteString(Text key) {
+ ByteString value = fixedByteStringCache.get(key);
+ if (value == null) {
+ value = ByteString.copyFromUtf8(key.toString());
+ fixedByteStringCache.put(key, value);
+ }
+ return value;
+ }
+
+ private static ByteString getFixedByteString(String key) {
+ ByteString value = fixedByteStringCache.get(key);
+ if (value == null) {
+ value = ByteString.copyFromUtf8(key);
+ fixedByteStringCache.put(key, value);
+ }
+ return value;
+ }
+
+ /**
+ * Guava cache for caching String to ByteString encoding. Use this when the
+ * set of Strings is large, mutable, or unknown.
+ */
+ private static LoadingCache<String, ByteString> bytestringCache =
+ CacheBuilder.newBuilder()
+ .maximumSize(10000)
+ .build(
+ new CacheLoader<String, ByteString>() {
+ public ByteString load(String key) {
+ return ByteString.copyFromUtf8(key);
+ }
+ });
+
private PBHelperClient() {
/** Hidden constructor */
}
@@ -294,7 +341,7 @@ public class PBHelperClient {
public static ExtendedBlockProto convert(final ExtendedBlock b) {
if (b == null) return null;
return ExtendedBlockProto.newBuilder().
- setPoolId(b.getBlockPoolId()).
+ setPoolIdBytes(getFixedByteString(b.getBlockPoolId())).
setBlockId(b.getBlockId()).
setNumBytes(b.getNumBytes()).
setGenerationStamp(b.getGenerationStamp()).
@@ -305,8 +352,8 @@ public class PBHelperClient {
return TokenProto.newBuilder().
setIdentifier(getByteString(tok.getIdentifier())).
setPassword(getByteString(tok.getPassword())).
- setKind(tok.getKind().toString()).
- setService(tok.getService().toString()).build();
+ setKindBytes(getFixedByteString(tok.getKind())).
+ setServiceBytes(getFixedByteString(tok.getService())).build();
}
public static ShortCircuitShmIdProto convert(ShmId shmId) {
@@ -329,11 +376,10 @@ public class PBHelperClient {
// which is the same as the DatanodeUuid. Since StorageID is a required
// field we pass the empty string if the DatanodeUuid is not yet known.
return DatanodeIDProto.newBuilder()
- .setIpAddr(dn.getIpAddr())
- .setHostName(dn.getHostName())
+ .setIpAddrBytes(dn.getIpAddrBytes())
+ .setHostNameBytes(dn.getHostNameBytes())
.setXferPort(dn.getXferPort())
- .setDatanodeUuid(dn.getDatanodeUuid() != null ?
- dn.getDatanodeUuid() : "")
+ .setDatanodeUuidBytes(dn.getDatanodeUuidBytes())
.setInfoPort(dn.getInfoPort())
.setInfoSecurePort(dn.getInfoSecurePort())
.setIpcPort(dn.getIpcPort()).build();
@@ -357,7 +403,8 @@ public class PBHelperClient {
public static DatanodeInfoProto convert(DatanodeInfo info) {
DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
if (info.getNetworkLocation() != null) {
- builder.setLocation(info.getNetworkLocation());
+ builder.setLocationBytes(
+ bytestringCache.getUnchecked(info.getNetworkLocation()));
}
if (info.getUpgradeDomain() != null) {
builder.setUpgradeDomain(info.getUpgradeDomain());
@@ -2260,8 +2307,8 @@ public class PBHelperClient {
setModificationTime(fs.getModificationTime()).
setAccessTime(fs.getAccessTime()).
setPermission(convert(fs.getPermission())).
- setOwner(fs.getOwner()).
- setGroup(fs.getGroup()).
+ setOwnerBytes(getFixedByteString(fs.getOwner())).
+ setGroupBytes(getFixedByteString(fs.getGroup())).
setFileId(fs.getFileId()).
setChildrenNum(fs.getChildrenNum()).
setPath(getByteString(fs.getLocalNameInBytes())).
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1d2640b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
index bdcbe7f..0f65269 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
+import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.net.*;
@@ -47,6 +48,7 @@ import java.io.OutputStream;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
+import java.util.UUID;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
@@ -171,9 +173,17 @@ public class TestDataXceiverBackwardsCompat {
DatanodeInfo datanodeInfo = mock(DatanodeInfo.class);
doReturn("localhost").when(datanodeInfo).getHostName();
+ doReturn(ByteString.copyFromUtf8("localhost"))
+ .when(datanodeInfo).getHostNameBytes();
doReturn("127.0.0.1").when(datanodeInfo).getIpAddr();
+ doReturn(ByteString.copyFromUtf8("127.0.0.1"))
+ .when(datanodeInfo).getIpAddrBytes();
doReturn(DatanodeInfo.AdminStates.NORMAL).when(datanodeInfo)
.getAdminState();
+ final String uuid = UUID.randomUUID().toString();
+ doReturn(uuid).when(datanodeInfo).getDatanodeUuid();
+ doReturn(ByteString.copyFromUtf8(uuid))
+ .when(datanodeInfo).getDatanodeUuidBytes();
Exception storedException = null;
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[15/18] hadoop git commit: HDDS-84. The root directory of
ozone.tar.gz should contain the version string. Contributed by Elek, Marton.
Posted by ha...@apache.org.
HDDS-84. The root directory of ozone.tar.gz should contain the version string. Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/63fc5873
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/63fc5873
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/63fc5873
Branch: refs/heads/HDDS-48
Commit: 63fc5873cee41b883e988ead00fc6f6cf74fae97
Parents: f61e3e7
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Wed May 23 21:07:37 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Wed May 23 21:07:37 2018 +0530
----------------------------------------------------------------------
dev-support/bin/ozone-dist-tar-stitching | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63fc5873/dev-support/bin/ozone-dist-tar-stitching
----------------------------------------------------------------------
diff --git a/dev-support/bin/ozone-dist-tar-stitching b/dev-support/bin/ozone-dist-tar-stitching
index decfa23..d1116e4 100755
--- a/dev-support/bin/ozone-dist-tar-stitching
+++ b/dev-support/bin/ozone-dist-tar-stitching
@@ -41,7 +41,7 @@ function run()
#To include the version name in the root directory of the tar file
# we create a symbolic link and dereference it during the tar creation
ln -s -f ozone ozone-${VERSION}
-run tar -c --dereference -f "ozone-${VERSION}.tar" "ozone"
+run tar -c --dereference -f "ozone-${VERSION}.tar" "ozone-${VERSION}"
run gzip -f "ozone-${VERSION}.tar"
echo
echo "Ozone dist tar available at: ${BASEDIR}/ozone-${VERSION}.tar.gz"
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[07/18] hadoop git commit: YARN-8310. Handle old NMTokenIdentifier,
AMRMTokenIdentifier,
and ContainerTokenIdentifier formats. Contributed by Robert Kanter.
Posted by ha...@apache.org.
YARN-8310. Handle old NMTokenIdentifier, AMRMTokenIdentifier, and ContainerTokenIdentifier formats. Contributed by Robert Kanter.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3e5f7ea9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3e5f7ea9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3e5f7ea9
Branch: refs/heads/HDDS-48
Commit: 3e5f7ea986600e084fcac723b0423e7de1b3bb8a
Parents: 68c7fd8
Author: Miklos Szegedi <mi...@cloudera.com>
Authored: Tue May 22 18:10:33 2018 -0700
Committer: Miklos Szegedi <mi...@cloudera.com>
Committed: Tue May 22 18:10:33 2018 -0700
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/io/IOUtils.java | 20 +++
.../yarn/security/AMRMTokenIdentifier.java | 33 ++++-
.../yarn/security/ContainerTokenIdentifier.java | 98 ++++++++++++---
.../hadoop/yarn/security/NMTokenIdentifier.java | 32 ++++-
.../yarn/security/TestYARNTokenIdentifier.java | 121 ++++++++++++++++++-
5 files changed, 278 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e5f7ea9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
index 7288812..3708a3b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
@@ -513,4 +513,24 @@ public class IOUtils {
throw exception;
}
}
+
+ /**
+ * Reads a DataInput until EOF and returns a byte array. Make sure not to
+ * pass in an infinite DataInput or this will never return.
+ *
+ * @param in A DataInput
+ * @return a byte array containing the data from the DataInput
+ * @throws IOException on I/O error, other than EOF
+ */
+ public static byte[] readFullyToByteArray(DataInput in) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ while (true) {
+ baos.write(in.readByte());
+ }
+ } catch (EOFException eof) {
+ // finished reading, do nothing
+ }
+ return baos.toByteArray();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e5f7ea9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java
index 56411a7..ed83b06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java
@@ -18,20 +18,26 @@
package org.apache.hadoop.yarn.security;
+import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.AMRMTokenIdentifierProto;
@@ -45,6 +51,8 @@ import com.google.protobuf.TextFormat;
@Evolving
public class AMRMTokenIdentifier extends TokenIdentifier {
+ private static final Log LOG = LogFactory.getLog(AMRMTokenIdentifier.class);
+
public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN");
private AMRMTokenIdentifierProto proto;
@@ -78,7 +86,30 @@ public class AMRMTokenIdentifier extends TokenIdentifier {
@Override
public void readFields(DataInput in) throws IOException {
- proto = AMRMTokenIdentifierProto.parseFrom((DataInputStream)in);
+ byte[] data = IOUtils.readFullyToByteArray(in);
+ try {
+ proto = AMRMTokenIdentifierProto.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ LOG.warn("Recovering old formatted token");
+ readFieldsInOldFormat(
+ new DataInputStream(new ByteArrayInputStream(data)));
+ }
+ }
+
+ private void readFieldsInOldFormat(DataInputStream in) throws IOException {
+ AMRMTokenIdentifierProto.Builder builder =
+ AMRMTokenIdentifierProto.newBuilder();
+ long clusterTimeStamp = in.readLong();
+ int appId = in.readInt();
+ int attemptId = in.readInt();
+ ApplicationId applicationId =
+ ApplicationId.newInstance(clusterTimeStamp, appId);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(applicationId, attemptId);
+ builder.setAppAttemptId(
+ ((ApplicationAttemptIdPBImpl)appAttemptId).getProto());
+ builder.setKeyId(in.readInt());
+ proto = builder.build();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e5f7ea9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
index c5a649d..37c74b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
@@ -1,40 +1,45 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.yarn.security;
+import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.EOFException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
@@ -48,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -325,7 +331,63 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
@Override
public void readFields(DataInput in) throws IOException {
- proto = ContainerTokenIdentifierProto.parseFrom((DataInputStream)in);
+ byte[] data = IOUtils.readFullyToByteArray(in);
+ try {
+ proto = ContainerTokenIdentifierProto.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ LOG.warn("Recovering old formatted token");
+ readFieldsInOldFormat(
+ new DataInputStream(new ByteArrayInputStream(data)));
+ }
+ }
+
+ private void readFieldsInOldFormat(DataInputStream in) throws IOException {
+ ContainerTokenIdentifierProto.Builder builder =
+ ContainerTokenIdentifierProto.newBuilder();
+ builder.setNodeLabelExpression(CommonNodeLabelsManager.NO_LABEL);
+ builder.setContainerType(ProtoUtils.convertToProtoFormat(
+ ContainerType.TASK));
+ builder.setExecutionType(ProtoUtils.convertToProtoFormat(
+ ExecutionType.GUARANTEED));
+ builder.setAllocationRequestId(-1);
+ builder.setVersion(0);
+
+ ApplicationId applicationId =
+ ApplicationId.newInstance(in.readLong(), in.readInt());
+ ApplicationAttemptId applicationAttemptId =
+ ApplicationAttemptId.newInstance(applicationId, in.readInt());
+ ContainerId containerId =
+ ContainerId.newContainerId(applicationAttemptId, in.readLong());
+ builder.setContainerId(ProtoUtils.convertToProtoFormat(containerId));
+ builder.setNmHostAddr(in.readUTF());
+ builder.setAppSubmitter(in.readUTF());
+ int memory = in.readInt();
+ int vCores = in.readInt();
+ Resource resource = Resource.newInstance(memory, vCores);
+ builder.setResource(ProtoUtils.convertToProtoFormat(resource));
+ builder.setExpiryTimeStamp(in.readLong());
+ builder.setMasterKeyId(in.readInt());
+ builder.setRmIdentifier(in.readLong());
+ Priority priority = Priority.newInstance(in.readInt());
+ builder.setPriority(((PriorityPBImpl)priority).getProto());
+ builder.setCreationTime(in.readLong());
+
+ int logAggregationSize = -1;
+ try {
+ logAggregationSize = in.readInt();
+ } catch (EOFException eof) {
+ // In the old format, there was no versioning or proper handling of new
+ // fields. Depending on how old, the log aggregation size and data, may
+ // or may not exist. To handle that, we try to read it and ignore the
+ // EOFException that's thrown if it's not there.
+ }
+ if (logAggregationSize != -1) {
+ byte[] bytes = new byte[logAggregationSize];
+ in.readFully(bytes);
+ builder.setLogAggregationContext(
+ LogAggregationContextProto.parseFrom(bytes));
+ }
+ proto = builder.build();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e5f7ea9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
index e28123e..cd1ad03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
@@ -18,19 +18,23 @@
package org.apache.hadoop.yarn.security;
+import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
@@ -99,7 +103,33 @@ public class NMTokenIdentifier extends TokenIdentifier {
@Override
public void readFields(DataInput in) throws IOException {
- proto = NMTokenIdentifierProto.parseFrom((DataInputStream)in);
+ byte[] data = IOUtils.readFullyToByteArray(in);
+ try {
+ proto = NMTokenIdentifierProto.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ LOG.warn("Recovering old formatted token");
+ readFieldsInOldFormat(
+ new DataInputStream(new ByteArrayInputStream(data)));
+ }
+ }
+
+ private void readFieldsInOldFormat(DataInputStream in) throws IOException {
+ NMTokenIdentifierProto.Builder builder =
+ NMTokenIdentifierProto.newBuilder();
+
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(in.readLong(), in.readInt()),
+ in.readInt());
+ builder.setAppAttemptId(((ApplicationAttemptIdPBImpl)appAttemptId)
+ .getProto());
+ String[] hostAddr = in.readUTF().split(":");
+ NodeId nodeId = NodeId.newInstance(hostAddr[0],
+ Integer.parseInt(hostAddr[1]));
+ builder.setNodeId(((NodeIdPBImpl)nodeId).getProto());
+ builder.setAppSubmitter(in.readUTF());
+ builder.setKeyId(in.readInt());
+ proto = builder.build();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e5f7ea9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java
index 82e1943..51fbe9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto;
@@ -48,6 +49,15 @@ public class TestYARNTokenIdentifier {
@Test
public void testNMTokenIdentifier() throws IOException {
+ testNMTokenIdentifier(false);
+ }
+
+ @Test
+ public void testNMTokenIdentifierOldFormat() throws IOException {
+ testNMTokenIdentifier(true);
+ }
+
+ public void testNMTokenIdentifier(boolean oldFormat) throws IOException {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1, 1), 1);
NodeId nodeId = NodeId.newInstance("host0", 0);
@@ -58,8 +68,13 @@ public class TestYARNTokenIdentifier {
appAttemptId, nodeId, applicationSubmitter, masterKeyId);
NMTokenIdentifier anotherToken = new NMTokenIdentifier();
-
- byte[] tokenContent = token.getBytes();
+
+ byte[] tokenContent;
+ if (oldFormat) {
+ tokenContent = writeInOldFormat(token);
+ } else {
+ tokenContent = token.getBytes();
+ }
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
anotherToken.readFields(dib);
@@ -88,6 +103,15 @@ public class TestYARNTokenIdentifier {
@Test
public void testAMRMTokenIdentifier() throws IOException {
+ testAMRMTokenIdentifier(false);
+ }
+
+ @Test
+ public void testAMRMTokenIdentifierOldFormat() throws IOException {
+ testAMRMTokenIdentifier(true);
+ }
+
+ public void testAMRMTokenIdentifier(boolean oldFormat) throws IOException {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1, 1), 1);
int masterKeyId = 1;
@@ -95,7 +119,13 @@ public class TestYARNTokenIdentifier {
AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, masterKeyId);
AMRMTokenIdentifier anotherToken = new AMRMTokenIdentifier();
- byte[] tokenContent = token.getBytes();
+
+ byte[] tokenContent;
+ if (oldFormat) {
+ tokenContent = writeInOldFormat(token);
+ } else {
+ tokenContent = token.getBytes();
+ }
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
anotherToken.readFields(dib);
@@ -138,9 +168,20 @@ public class TestYARNTokenIdentifier {
Assert.assertEquals("clientName from proto is not the same with original token",
anotherToken.getClientName(), clientName);
}
-
+
@Test
public void testContainerTokenIdentifier() throws IOException {
+ testContainerTokenIdentifier(false, false);
+ }
+
+ @Test
+ public void testContainerTokenIdentifierOldFormat() throws IOException {
+ testContainerTokenIdentifier(true, true);
+ testContainerTokenIdentifier(true, false);
+ }
+
+ public void testContainerTokenIdentifier(boolean oldFormat,
+ boolean withLogAggregation) throws IOException {
ContainerId containerID = ContainerId.newContainerId(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(
1, 1), 1), 1);
@@ -158,8 +199,13 @@ public class TestYARNTokenIdentifier {
masterKeyId, rmIdentifier, priority, creationTime);
ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier();
-
- byte[] tokenContent = token.getBytes();
+
+ byte[] tokenContent;
+ if (oldFormat) {
+ tokenContent = writeInOldFormat(token, withLogAggregation);
+ } else {
+ tokenContent = token.getBytes();
+ }
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
anotherToken.readFields(dib);
@@ -426,4 +472,67 @@ public class TestYARNTokenIdentifier {
anotherToken.getExecutionType());
}
+ @SuppressWarnings("deprecation")
+ private byte[] writeInOldFormat(ContainerTokenIdentifier token,
+ boolean withLogAggregation) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+ ApplicationAttemptId applicationAttemptId = token.getContainerID()
+ .getApplicationAttemptId();
+ ApplicationId applicationId = applicationAttemptId.getApplicationId();
+ out.writeLong(applicationId.getClusterTimestamp());
+ out.writeInt(applicationId.getId());
+ out.writeInt(applicationAttemptId.getAttemptId());
+ out.writeLong(token.getContainerID().getContainerId());
+ out.writeUTF(token.getNmHostAddress());
+ out.writeUTF(token.getApplicationSubmitter());
+ out.writeInt(token.getResource().getMemory());
+ out.writeInt(token.getResource().getVirtualCores());
+ out.writeLong(token.getExpiryTimeStamp());
+ out.writeInt(token.getMasterKeyId());
+ out.writeLong(token.getRMIdentifier());
+ out.writeInt(token.getPriority().getPriority());
+ out.writeLong(token.getCreationTime());
+ if (withLogAggregation) {
+ if (token.getLogAggregationContext() == null) {
+ out.writeInt(-1);
+ } else {
+ byte[] logAggregationContext = ((LogAggregationContextPBImpl)
+ token.getLogAggregationContext()).getProto().toByteArray();
+ out.writeInt(logAggregationContext.length);
+ out.write(logAggregationContext);
+ }
+ }
+ out.close();
+ return baos.toByteArray();
+ }
+
+ private byte[] writeInOldFormat(NMTokenIdentifier token) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+ ApplicationId applicationId = token.getApplicationAttemptId()
+ .getApplicationId();
+ out.writeLong(applicationId.getClusterTimestamp());
+ out.writeInt(applicationId.getId());
+ out.writeInt(token.getApplicationAttemptId().getAttemptId());
+ out.writeUTF(token.getNodeId().toString());
+ out.writeUTF(token.getApplicationSubmitter());
+ out.writeInt(token.getKeyId());
+ out.close();
+ return baos.toByteArray();
+ }
+
+ private byte[] writeInOldFormat(AMRMTokenIdentifier token)
+ throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+ ApplicationId applicationId = token.getApplicationAttemptId()
+ .getApplicationId();
+ out.writeLong(applicationId.getClusterTimestamp());
+ out.writeInt(applicationId.getId());
+ out.writeInt(token.getApplicationAttemptId().getAttemptId());
+ out.writeInt(token.getKeyId());
+ out.close();
+ return baos.toByteArray();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org