You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2020/06/10 13:24:32 UTC
[hadoop-ozone] branch master updated: HDDS-3750. Improve SCM
performance with 3.2% by avoid stream.collect (#1035)
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 4da05e4 HDDS-3750. Improve SCM performance with 3.2% by avoid stream.collect (#1035)
4da05e4 is described below
commit 4da05e463828f0d55a5497f6d1364663d682a083
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Jun 10 21:24:20 2020 +0800
HDDS-3750. Improve SCM performance with 3.2% by avoid stream.collect (#1035)
---
.../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 9 ++-
.../hadoop/hdds/scm/pipeline/PipelineStateMap.java | 72 +++++++++++++++-------
.../hdds/scm/server/SCMClientProtocolServer.java | 8 ++-
3 files changed, 61 insertions(+), 28 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 7776b21..97d4837 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -256,6 +256,11 @@ public final class Pipeline {
public HddsProtos.Pipeline getProtobufMessage()
throws UnknownPipelineStateException {
+ List<HddsProtos.DatanodeDetailsProto> members = new ArrayList<>();
+ for (DatanodeDetails dn : nodeStatus.keySet()) {
+ members.add(dn.getProtoBufMessage());
+ }
+
HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder()
.setId(id.getProtobuf())
.setType(type)
@@ -263,9 +268,7 @@ public final class Pipeline {
.setState(PipelineState.getProtobuf(state))
.setLeaderID(leaderId != null ? leaderId.toString() : "")
.setCreationTimeStamp(creationTimestamp.toEpochMilli())
- .addAllMembers(nodeStatus.keySet().stream()
- .map(DatanodeDetails::getProtoBufMessage)
- .collect(Collectors.toList()));
+ .addAllMembers(members);
// To save the message size on wire, only transfer the node order based on
// network topology
List<DatanodeDetails> nodes = nodesInOrder.get();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 3c7af08..69fbc08 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -33,7 +33,6 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
-import java.util.stream.Collectors;
/**
* Holds the data structures which maintain the information about pipeline and
@@ -152,9 +151,14 @@ class PipelineStateMap {
List<Pipeline> getPipelines(ReplicationType type) {
Preconditions.checkNotNull(type, "Replication type cannot be null");
- return pipelineMap.values().stream()
- .filter(p -> p.getType().equals(type))
- .collect(Collectors.toList());
+ List<Pipeline> pipelines = new ArrayList<>();
+ for (Pipeline pipeline : pipelineMap.values()) {
+ if (pipeline.getType() == type) {
+ pipelines.add(pipeline);
+ }
+ }
+
+ return pipelines;
}
/**
@@ -168,10 +172,14 @@ class PipelineStateMap {
Preconditions.checkNotNull(type, "Replication type cannot be null");
Preconditions.checkNotNull(factor, "Replication factor cannot be null");
- return pipelineMap.values().stream()
- .filter(pipeline -> pipeline.getType() == type
- && pipeline.getFactor() == factor)
- .collect(Collectors.toList());
+ List<Pipeline> pipelines = new ArrayList<>();
+ for (Pipeline pipeline : pipelineMap.values()) {
+ if (pipeline.getType() == type && pipeline.getFactor() == factor) {
+ pipelines.add(pipeline);
+ }
+ }
+
+ return pipelines;
}
/**
@@ -188,10 +196,16 @@ class PipelineStateMap {
Set<PipelineState> pipelineStates = new HashSet<>();
pipelineStates.addAll(Arrays.asList(states));
- return pipelineMap.values().stream().filter(
- pipeline -> pipeline.getType() == type && pipelineStates
- .contains(pipeline.getPipelineState()))
- .collect(Collectors.toList());
+
+ List<Pipeline> pipelines = new ArrayList<>();
+ for (Pipeline pipeline : pipelineMap.values()) {
+ if (pipeline.getType() == type
+ && pipelineStates.contains(pipeline.getPipelineState())) {
+ pipelines.add(pipeline);
+ }
+ }
+
+ return pipelines;
}
/**
@@ -210,15 +224,21 @@ class PipelineStateMap {
Preconditions.checkNotNull(state, "Pipeline state cannot be null");
if (state == PipelineState.OPEN) {
- return Collections.unmodifiableList(
+ return new ArrayList<>(
query2OpenPipelines.getOrDefault(
new PipelineQuery(type, factor), Collections.EMPTY_LIST));
}
- return pipelineMap.values().stream().filter(
- pipeline -> pipeline.getType() == type
- && pipeline.getPipelineState() == state
- && pipeline.getFactor() == factor)
- .collect(Collectors.toList());
+
+ List<Pipeline> pipelines = new ArrayList<>();
+ for (Pipeline pipeline : pipelineMap.values()) {
+ if (pipeline.getType() == type
+ && pipeline.getPipelineState() == state
+ && pipeline.getFactor() == factor) {
+ pipelines.add(pipeline);
+ }
+ }
+
+ return pipelines;
}
/**
@@ -242,10 +262,18 @@ class PipelineStateMap {
.checkNotNull(excludeDns, "Datanode exclude list cannot be null");
Preconditions
.checkNotNull(excludeDns, "Pipeline exclude list cannot be null");
- return getPipelines(type, factor, state).stream().filter(
- pipeline -> !discardPipeline(pipeline, excludePipelines)
- && !discardDatanode(pipeline, excludeDns))
- .collect(Collectors.toList());
+
+ List<Pipeline> pipelines = getPipelines(type, factor, state);
+ Iterator<Pipeline> iter = pipelines.iterator();
+ while (iter.hasNext()) {
+ Pipeline pipeline = iter.next();
+ if (discardPipeline(pipeline, excludePipelines) ||
+ discardDatanode(pipeline, excludeDns)) {
+ iter.remove();
+ }
+ }
+
+ return pipelines;
}
private boolean discardPipeline(Pipeline pipeline,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index e049ef5..d935f24 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -285,10 +285,13 @@ public class SCMClientProtocolServer implements
List<ContainerWithPipeline> cpList = new ArrayList<>();
+ StringBuilder strContainerIDs = new StringBuilder();
for (Long containerID : containerIDs) {
try {
ContainerWithPipeline cp = getContainerWithPipelineCommon(containerID);
cpList.add(cp);
+ strContainerIDs.append(ContainerID.valueof(containerID).toString());
+ strContainerIDs.append(",");
} catch (IOException ex) {
AUDIT.logReadFailure(buildAuditMessageForFailure(
SCMAction.GET_CONTAINER_WITH_PIPELINE_BATCH,
@@ -298,11 +301,10 @@ public class SCMClientProtocolServer implements
}
}
+
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.GET_CONTAINER_WITH_PIPELINE_BATCH,
- Collections.singletonMap("containerIDs",
- containerIDs.stream().map(id -> ContainerID.valueof(id).toString())
- .collect(Collectors.joining(",")))));
+ Collections.singletonMap("containerIDs", strContainerIDs.toString())));
return cpList;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org