You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/11 22:19:41 UTC
[7/8] flink git commit: [FLINK-1969] [runtime] Remove deprecated
profiler code
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java
deleted file mode 100644
index e07f144..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.types.StringValue;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A single instance profiling event encapsulates profiling information for one particular instance.
- */
-public final class SingleInstanceProfilingEvent extends InstanceProfilingEvent {
-
- private static final long serialVersionUID = 1L;
-
- private String instanceName;
-
- /**
- * Constructs a new instance profiling event.
- *
- * @param profilingInterval
- * the interval of time this profiling event covers in milliseconds
- * @param ioWaitCPU
- * the percentage of time the CPU(s) spent in state IOWAIT during the profiling interval
- * @param idleCPU
- * the percentage of time the CPU(s) spent in state IDLE during the profiling interval
- * @param userCPU
- * the percentage of time the CPU(s) spent in state USER during the profiling interval
- * @param systemCPU
- * the percentage of time the CPU(s) spent in state SYSTEM during the profiling interval
- * @param hardIrqCPU
- * the percentage of time the CPU(s) spent in state HARD_IRQ during the profiling interval
- * @param softIrqCPU
- * the percentage of time the CPU(s) spent in state SOFT_IRQ during the profiling interval
- * @param totalMemory
- * the total amount of this instance's main memory in bytes
- * @param freeMemory
- * the free amount of this instance's main memory in bytes
- * @param bufferedMemory
- * the amount of main memory the instance uses for file buffers
- * @param cachedMemory
- * the amount of main memory the instance uses as cache memory
- * @param cachedSwapMemory
- * The amount of main memory the instance uses for cached swaps
- * @param receivedBytes
- * the number of bytes received via network during the profiling interval
- * @param transmittedBytes
- * the number of bytes transmitted via network during the profiling interval
- * @param jobID
- * the ID of this job this profiling event belongs to
- * @param timestamp
- * the time stamp of this profiling event's creation
- * @param profilingTimestamp
- * the time stamp relative to the beginning of the job's execution
- * @param instanceName
- * the name of the instance this profiling event refers to
- */
- public SingleInstanceProfilingEvent(final int profilingInterval, final int ioWaitCPU, final int idleCPU,
- final int userCPU, final int systemCPU, final int hardIrqCPU, final int softIrqCPU, final long totalMemory,
- final long freeMemory, final long bufferedMemory, final long cachedMemory, final long cachedSwapMemory,
- final long receivedBytes, final long transmittedBytes, final JobID jobID, final long timestamp,
- final long profilingTimestamp, final String instanceName)
- {
- super(profilingInterval, ioWaitCPU, idleCPU, userCPU, systemCPU, hardIrqCPU, softIrqCPU, totalMemory,
- freeMemory, bufferedMemory, cachedMemory, cachedSwapMemory, receivedBytes, transmittedBytes, jobID,
- timestamp, profilingTimestamp);
-
- Preconditions.checkNotNull(instanceName);
- this.instanceName = instanceName;
- }
-
- /**
- * Default constructor for serialization/deserialization.
- */
- public SingleInstanceProfilingEvent() {
- super();
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Returns the name of the instance.
- *
- * @return the name of the instance
- */
- public String getInstanceName() {
- return this.instanceName;
- }
-
- // --------------------------------------------------------------------------------------------
- // Serialization
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void read(DataInputView in) throws IOException {
- super.read(in);
- this.instanceName = StringValue.readString(in);
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- super.write(out);
- StringValue.writeString(this.instanceName, out);
- }
-
- // --------------------------------------------------------------------------------------------
- // Utilities
- // --------------------------------------------------------------------------------------------
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof SingleInstanceProfilingEvent) {
- SingleInstanceProfilingEvent other = (SingleInstanceProfilingEvent) obj;
- return super.equals(obj) && this.instanceName.equals(other.instanceName);
-
- }
- else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return super.hashCode() + 31*instanceName.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java
deleted file mode 100644
index 08b932a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-/**
- * Through this interface it is possible to access profiling data about the CPU utilization
- * of the corresponding execution thread during its execution.
- */
-public class ThreadProfilingEvent extends VertexProfilingEvent {
-
- private static final long serialVersionUID = -3006867830244444710L;
-
- private int userTime;
-
- private int systemTime;
-
- private int blockedTime;
-
- private int waitedTime;
-
- public ThreadProfilingEvent(int userTime, int systemTime, int blockedTime, int waitedTime,
- JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
- int profilingInterval, JobID jobID, long timestamp, long profilingTimestamp)
- {
- super(vertexId, subtask, executionId, profilingInterval, jobID, timestamp, profilingTimestamp);
-
- this.userTime = userTime;
- this.systemTime = systemTime;
- this.blockedTime = blockedTime;
- this.waitedTime = waitedTime;
- }
-
- public ThreadProfilingEvent() {
- super();
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Returns the percentage of time the execution thread spent in
- * user mode in the given profiling interval.
- *
- * @return the percentage of time spent in user mode
- */
- public int getUserTime() {
- return this.userTime;
- }
-
- /**
- * Returns the percentage of time the execution thread spent in
- * system mode in the given profiling interval.
- *
- * @return the percentage of time spent in system mode
- */
- public int getSystemTime() {
- return this.systemTime;
- }
-
- /**
- * Returns the percentage of time the execution thread has been
- * blocked to enter or reenter a monitor in the given profiling interval.
- *
- * @return the percentage of time the thread has been blocked
- */
- public int getBlockedTime() {
- return this.blockedTime;
- }
-
- /**
- * Returns the percentage of time the execution thread spent in
- * either <code>WAITING</code> or <code>TIMED_WAITING</code> state in the given profiling interval.
- *
- * @return the percentage of time the thread spent waiting
- */
- public int getWaitedTime() {
- return this.waitedTime;
- }
-
- // --------------------------------------------------------------------------------------------
- // Serialization
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void read(DataInputView in) throws IOException {
- super.read(in);
-
- this.userTime = in.readInt();
- this.systemTime = in.readInt();
- this.blockedTime = in.readInt();
- this.waitedTime = in.readInt();
- }
-
-
- @Override
- public void write(DataOutputView out) throws IOException {
- super.write(out);
-
- out.writeInt(this.userTime);
- out.writeInt(this.systemTime);
- out.writeInt(this.blockedTime);
- out.writeInt(this.waitedTime);
- }
-
-
- // --------------------------------------------------------------------------------------------
- // Utilities
- // --------------------------------------------------------------------------------------------
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof ThreadProfilingEvent) {
- final ThreadProfilingEvent other = (ThreadProfilingEvent) obj;
-
- return this.userTime == other.userTime &&
- this.systemTime == other.systemTime &&
- this.blockedTime == other.blockedTime &&
- this.waitedTime == other.waitedTime &&
- super.equals(obj);
- }
- else {
- return false;
- }
- }
-
- // hash code is inherited from the superclass
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java
deleted file mode 100644
index fc18758..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.profiling.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-/**
- * This interface is a base interface for profiling data which
- * pertains to the execution of tasks.
- */
-public abstract class VertexProfilingEvent extends ProfilingEvent {
-
- private static final long serialVersionUID = -5364961557518174880L;
-
- private final JobVertexID vertexId;
-
- private int subtask;
-
- private final ExecutionAttemptID executionId;
-
- private int profilingInterval;
-
-
- public VertexProfilingEvent(JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
- int profilingInterval, JobID jobID, long timestamp, long profilingTimestamp)
- {
- super(jobID, timestamp, profilingTimestamp);
-
- this.vertexId = vertexId;
- this.subtask = subtask;
- this.executionId = executionId;
- this.profilingInterval = profilingInterval;
- }
-
- public VertexProfilingEvent() {
- super();
- this.vertexId = new JobVertexID();
- this.executionId = new ExecutionAttemptID();
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Returns the ID of the vertex this profiling information
- * belongs to.
- *
- * @return the ID of the vertex this profiling information belongs to
- */
- public JobVertexID getVertexID() {
- return this.vertexId;
- }
-
- /**
- * The interval in milliseconds to which the rest
- * of the profiling data relates to.
- *
- * @return the profiling interval given in milliseconds
- */
- public int getProfilingInterval() {
- return this.profilingInterval;
- }
-
- public int getSubtask() {
- return subtask;
- }
-
- public ExecutionAttemptID getExecutionId() {
- return executionId;
- }
-
- // --------------------------------------------------------------------------------------------
- // Serialization
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void read(DataInputView in) throws IOException {
- super.read(in);
-
- this.vertexId.read(in);
- this.executionId.read(in);
- this.subtask = in.readInt();
- this.profilingInterval = in.readInt();
- }
-
-
- @Override
- public void write(DataOutputView out) throws IOException {
- super.write(out);
-
- this.vertexId.write(out);
- this.executionId.write(out);
- out.writeInt(subtask);
- out.writeInt(this.profilingInterval);
- }
-
- // --------------------------------------------------------------------------------------------
- // Utilities
- // --------------------------------------------------------------------------------------------
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof VertexProfilingEvent) {
- final VertexProfilingEvent other = (VertexProfilingEvent) obj;
-
- return super.equals(other) && this.subtask == other.subtask &&
- this.profilingInterval == other.profilingInterval &&
- this.vertexId.equals(other.vertexId) &&
- this.executionId.equals(other.executionId);
- }
- else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return super.hashCode() ^ vertexId.hashCode() ^ (31*subtask) ^ executionId.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 15a9446..75463fd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -53,11 +53,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
-import org.apache.flink.runtime.profiling.ProfilingUtils
import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
-import org.slf4j.LoggerFactory
-
import akka.actor._
import scala.concurrent._
@@ -98,7 +95,6 @@ class JobManager(val flinkConfiguration: Configuration,
val libraryCacheManager: BlobLibraryCacheManager,
val archive: ActorRef,
val accumulatorManager: AccumulatorManager,
- val profiler: Option[ActorRef],
val defaultExecutionRetries: Int,
val delayBetweenRetries: Long,
val timeout: FiniteDuration)
@@ -124,7 +120,6 @@ class JobManager(val flinkConfiguration: Configuration,
}
archive ! PoisonPill
- profiler.foreach( ref => ref ! PoisonPill )
for((e,_) <- currentJobs.values) {
e.fail(new Exception("The JobManager is shutting down."))
@@ -935,7 +930,7 @@ object JobManager {
/**
* Create the job manager components as (instanceManager, scheduler, libraryCacheManager,
- * archiverProps, accumulatorManager, profiler, defaultExecutionRetries,
+ * archiverProps, accumulatorManager, defaultExecutionRetries,
* delayBetweenRetries, timeout)
*
* @param configuration The configuration from which to parse the config values.
@@ -943,15 +938,13 @@ object JobManager {
*/
def createJobManagerComponents(configuration: Configuration) :
(InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
- Props, AccumulatorManager, Option[Props], Int, Long, FiniteDuration, Int) = {
+ Props, AccumulatorManager, Int, Long, FiniteDuration, Int) = {
val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
- val profilingEnabled = configuration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, false)
-
val cleanupInterval = configuration.getLong(
ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
@@ -978,12 +971,6 @@ object JobManager {
val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
- val profilerProps: Option[Props] = if (profilingEnabled) {
- Some(Props(classOf[JobManagerProfiler]))
- } else {
- None
- }
-
val accumulatorManager: AccumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
var blobServer: BlobServer = null
@@ -1018,7 +1005,7 @@ object JobManager {
}
(instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
- profilerProps, executionRetries, delayBetweenRetries, timeout, archiveCount)
+ executionRetries, delayBetweenRetries, timeout, archiveCount)
}
/**
@@ -1052,12 +1039,9 @@ object JobManager {
archiverActorName: Option[String]): (ActorRef, ActorRef) = {
val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
- profilerProps, executionRetries, delayBetweenRetries,
+ executionRetries, delayBetweenRetries,
timeout, _) = createJobManagerComponents(configuration)
- val profiler: Option[ActorRef] =
- profilerProps.map( props => actorSystem.actorOf(props, PROFILER_NAME) )
-
// start the archiver wither with the given name, or without (avoid name conflicts)
val archiver: ActorRef = archiverActorName match {
case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
@@ -1065,7 +1049,7 @@ object JobManager {
}
val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler,
- libraryCacheManager, archiver, accumulatorManager, profiler, executionRetries,
+ libraryCacheManager, archiver, accumulatorManager, executionRetries,
delayBetweenRetries, timeout)
val jobManager: ActorRef = jobMangerActorName match {
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
deleted file mode 100644
index dd3a1b7..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager
-
-import akka.actor.Actor
-import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
-import org.apache.flink.runtime.messages.JobManagerProfilerMessages.ReportProfilingData
-import org.apache.flink.runtime.profiling.impl.types.{InternalInstanceProfilingData, InternalExecutionVertexThreadProfilingData}
-
-import scala.collection.convert.WrapAsScala
-
-/**
- * Basic skeleton for the JobManager profiler. Currently, it simply logs the received messages.
- */
-class JobManagerProfiler
- extends Actor
- with ActorLogMessages
- with ActorSynchronousLogging
- with WrapAsScala {
- override def receiveWithLogMessages: Receive = {
- case ReportProfilingData(profilingContainer) =>
- profilingContainer.getIterator foreach {
- case x: InternalExecutionVertexThreadProfilingData =>
- log.info(s"Received InternalExecutionVertexThreadProfilingData $x.")
- case x: InternalInstanceProfilingData =>
- log.info(s"Received InternalInstanceProfilingData $x.")
- case x =>
- log.error(s"Received unknown profiling data: ${x.getClass.getName}" )
- }
- }
-
- /**
- * Handle unmatched messages with an exception.
- */
- override def unhandled(message: Any): Unit = {
- // let the actor crash
- throw new RuntimeException("Received unknown message " + message)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerProfilerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerProfilerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerProfilerMessages.scala
deleted file mode 100644
index 85e0fc3..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerProfilerMessages.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer
-
-/**
- * This object contains the job manager profiler messages
- */
-object JobManagerProfilerMessages {
-
- /**
- * Reports profiling data to the profiler.
- * @param profilingDataContainer
- */
- case class ReportProfilingData(profilingDataContainer: ProfilingDataContainer)
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerProfilerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerProfilerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerProfilerMessages.scala
deleted file mode 100644
index 0662d48..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerProfilerMessages.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.taskmanager.Task
-
-object TaskManagerProfilerMessages {
-
- /**
- * Requests to monitor the specified [[task]].
- *
- * @param task
- */
- case class MonitorTask(task: Task)
-
- /**
- * Requests to unmonitor the task associated to [[executionID]].
- *
- * @param executionID
- */
- case class UnmonitorTask(executionID: ExecutionAttemptID)
-
- /**
- * Registers the sender as a profiling event listener.
- */
- case object RegisterProfilingListener
-
- /**
- * Unregisters the sender as a profiling event listener.
- */
- case object UnregisterProfilingListener
-
- /**
- * Makes the task manager profiling the running tasks.
- */
- case object ProfileTasks
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
deleted file mode 100644
index f0079f8..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import java.lang.management.ManagementFactory
-import java.util.concurrent.TimeUnit
-
-import akka.actor.{Cancellable, ActorRef, Actor, ActorLogging}
-import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
-import org.apache.flink.runtime.execution.{RuntimeEnvironment, ExecutionState}
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.JobVertexID
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.ExecutionStateChanged
-import org.apache.flink.runtime.messages.JobManagerProfilerMessages.ReportProfilingData
-import org.apache.flink.runtime.messages.TaskManagerProfilerMessages._
-import org.apache.flink.runtime.profiling.ProfilingException
-import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer
-import org.apache.flink.runtime.profiling.impl.{EnvironmentThreadSet, InstanceProfiler}
-
-import scala.concurrent.duration.FiniteDuration
-
-/**
- * Actor which is responsible for profiling task threads on the [[TaskManager]]. The monitoring
- * is triggered by the self-addressed message [[ProfileTasks]] which is scheduled to be sent
- * repeatedly.
- *
- * @param instancePath Akka URL to [[TaskManager]] instance
- * @param reportInterval Interval of profiling action
- */
-class TaskManagerProfiler(val instancePath: String, val reportInterval: Int)
- extends Actor with ActorLogMessages with ActorSynchronousLogging {
-
- import context.dispatcher
-
- val tmx = ManagementFactory.getThreadMXBean
- val instanceProfiler = new InstanceProfiler(instancePath)
- val listeners = scala.collection.mutable.Set[ActorRef]()
- val environments = scala.collection.mutable.HashMap[ExecutionAttemptID, RuntimeEnvironment]()
- val monitoredThreads = scala.collection.mutable.HashMap[RuntimeEnvironment,
- EnvironmentThreadSet]()
-
- var monitoringScheduler: Option[Cancellable] = None
-
- if (tmx.isThreadContentionMonitoringSupported) {
- tmx.setThreadContentionMonitoringEnabled(true)
- } else {
- throw new ProfilingException("The thread contention monitoring is not supported.")
- }
-
-
- override def receiveWithLogMessages: Receive = {
- case MonitorTask(task) =>
- task.registerExecutionListener(self)
- environments += task.getExecutionId -> task.getEnvironment
-
- case UnmonitorTask(executionAttemptID) =>
- environments.remove(executionAttemptID)
-
- case RegisterProfilingListener =>
- listeners += sender
- if (monitoringScheduler.isEmpty) {
- startMonitoring()
- }
-
- case UnregisterProfilingListener =>
- listeners -= sender
- if (listeners.isEmpty) {
- stopMonitoring()
- }
-
- case ProfileTasks =>
- val timestamp = System.currentTimeMillis()
-
- val profilingDataContainer = new ProfilingDataContainer()
-
- for ((env, set) <- monitoredThreads) {
- val threadProfilingData = set.captureCPUUtilization(env.getJobID, tmx, timestamp)
-
- if (threadProfilingData != null) {
- profilingDataContainer.addProfilingData(threadProfilingData)
- }
-
- if (monitoredThreads.nonEmpty) {
- val instanceProfilingData = try {
- Some(instanceProfiler.generateProfilingData(timestamp))
- } catch {
- case e: ProfilingException =>
- log.error("Error while retrieving instance profiling data.", e)
- None
- }
-
- instanceProfilingData foreach {
- profilingDataContainer.addProfilingData(_)
- }
-
- if (!profilingDataContainer.isEmpty) {
- for (listener <- listeners) {
- listener ! ReportProfilingData(profilingDataContainer)
- }
- }
-
- profilingDataContainer.clear()
- }
- }
-
- case ExecutionStateChanged(_, vertexID, _, _, subtaskIndex, executionID, newExecutionState,
- _, _) =>
- import ExecutionState._
-
- environments.get(executionID) match {
- case Some(environment) =>
- newExecutionState match {
- case RUNNING => registerMainThreadForCPUProfiling(environment, vertexID,
- subtaskIndex, executionID)
- case FINISHED | CANCELING | CANCELED | FAILED =>
- unregisterMainThreadFromCPUProfiling(environment)
- case _ =>
- }
- case None =>
- log.warn(s"Could not find environment for execution id $executionID.")
- }
- }
-
- /**
- * Handle unmatched messages with an exception.
- */
- override def unhandled(message: Any): Unit = {
- // let the actor crash
- throw new RuntimeException("Received unknown message " + message)
- }
-
- def startMonitoring(): Unit = {
- val interval = new FiniteDuration(reportInterval, TimeUnit.MILLISECONDS)
- val delay = new FiniteDuration((reportInterval * Math.random()).toLong, TimeUnit.MILLISECONDS)
-
- // schedule ProfileTasks message to be sent repeatedly to oneself
- monitoringScheduler = Some(context.system.scheduler.schedule(delay, interval, self,
- ProfileTasks))
- }
-
- def stopMonitoring(): Unit = {
- monitoringScheduler.foreach {
- _.cancel()
- }
- monitoringScheduler = None
- }
-
- def registerMainThreadForCPUProfiling(environment: RuntimeEnvironment, vertexID: JobVertexID,
- subtask: Int,
- executionID: ExecutionAttemptID): Unit = {
- monitoredThreads += environment -> new EnvironmentThreadSet(tmx,
- environment.getExecutingThread, vertexID,
- subtask, executionID)
- }
-
- def unregisterMainThreadFromCPUProfiling(environment: RuntimeEnvironment): Unit = {
- monitoredThreads.remove(environment) match {
- case Some(set) =>
- if (set.getMainThread != environment.getExecutingThread) {
- log.error(s"The thread ${environment.getExecutingThread.getName} is not the main thread" +
- s" of this environment.")
- }
- case None =>
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/impl/InstanceProfilerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/impl/InstanceProfilerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/impl/InstanceProfilerTest.java
deleted file mode 100644
index 85a4a37..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/impl/InstanceProfilerTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.profiling.impl;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.initMocks;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.net.InetAddress;
-
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.profiling.ProfilingException;
-import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-/**
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(InstanceProfiler.class)
-public class InstanceProfilerTest {
-
- @Mock
- private InstanceConnectionInfo infoMock;
-
- @Mock
- private InetAddress addressMock;
-
- @Mock
- private BufferedReader cpuBufferMock;
-
- @Mock
- private BufferedReader networkBufferMock;
-
- @Mock
- private BufferedReader memoryBufferMock;
-
- @Mock
- private FileReader cpuReaderMock;
-
- @Mock
- private FileReader networkReaderMock;
-
- @Mock
- private FileReader memoryReaderMock;
-
- // object under test
- InstanceProfiler out;
-
- @Before
- public void setUp() throws Exception {
- initMocks(this);
- when(this.infoMock.address()).thenReturn(this.addressMock);
- when(this.addressMock.getHostAddress()).thenReturn("192.168.1.1");
-
- whenNew(FileReader.class).withArguments(InstanceProfiler.PROC_STAT).thenReturn(this.cpuReaderMock);
- whenNew(BufferedReader.class).withArguments(this.cpuReaderMock).thenReturn(this.cpuBufferMock);
-
- when(this.cpuBufferMock.readLine()).thenReturn(
- "cpu 222875 20767 209704 3782096 209864 0 1066 0 0 0"
- );
-
- whenNew(FileReader.class).withArguments(InstanceProfiler.PROC_NET_DEV).thenReturn(this.networkReaderMock);
- whenNew(BufferedReader.class).withArguments(this.networkReaderMock).thenReturn(this.networkBufferMock);
-
- when(this.networkBufferMock.readLine())
- .thenReturn(
- " eth0: 364729203 286442 0 0 0 0 0 1060 14483806 191563 0 0 0 0 0 0",
- (String) null,
- " eth0: 364729203 286442 0 0 0 0 0 1060 14483806 191563 0 0 0 0 0 0",
- (String) null,
- " eth0: 364729203 286442 0 0 0 0 0 1060 14483806 191563 0 0 0 0 0 0",
- (String) null
- );
-
- whenNew(FileReader.class).withArguments(InstanceProfiler.PROC_MEMINFO).thenReturn(this.memoryReaderMock);
- whenNew(BufferedReader.class).withArguments(this.memoryReaderMock).thenReturn(this.memoryBufferMock);
-
- when(this.memoryBufferMock.readLine()).thenReturn(
- "MemTotal: 8052956 kB",
- "MemFree: 3999880 kB",
- "Buffers: 77216 kB",
- "Cached: 1929640 kB",
- null,
- "MemTotal: 8052956 kB",
- "MemFree: 3999880 kB",
- "Buffers: 77216 kB",
- "Cached: 1929640 kB",
- null,
- "MemTotal: 8052956 kB",
- "MemFree: 3999880 kB",
- "Buffers: 77216 kB",
- "Cached: 1929640 kB",
- null
- );
-
- PowerMockito.mockStatic(System.class);
- when(System.currentTimeMillis()).thenReturn(0L);
-
- this.out = new InstanceProfiler("InstanceProfilerTest");
- }
-
- @Test
- public void shouldHaveNetworkTraffic() {
-
- try {
- final InternalInstanceProfilingData generateProfilingData = out.generateProfilingData(0L);
- assertEquals(0L, generateProfilingData.getReceivedBytes());
- assertEquals(0L, generateProfilingData.getTransmittedBytes());
- } catch (ProfilingException e) {
- fail(e.getMessage());
- }
- }
-
- @Test
- public void shouldHaveMemSettingsMeasured() {
-
- try {
- final InternalInstanceProfilingData generateProfilingData = out.generateProfilingData(0L);
-
- final long totalMemory = generateProfilingData.getTotalMemory();
- assertThat(totalMemory, is(equalTo(8052956L)));
-
- long freeMemory = generateProfilingData.getFreeMemory();
- assertThat(freeMemory, is(equalTo(3999880L)));
-
- long buffers = generateProfilingData.getBufferedMemory();
- assertThat(buffers, is(equalTo(77216L)));
-
- long cached = generateProfilingData.getCachedMemory();
- assertThat(cached, is(equalTo(1929640L)));
- } catch (ProfilingException e) {
- fail(e.getMessage());
- }
- }
-
- @Test
- public void shouldMeasureCPUUtilization() {
-
- try {
- final InternalInstanceProfilingData generateProfilingData = out.generateProfilingData(0L);
-
- assertEquals(0L, generateProfilingData.getUserCPU());
- assertEquals(0L, generateProfilingData.getIdleCPU());
- assertEquals(0L, generateProfilingData.getSystemCPU());
- assertEquals(0L, generateProfilingData.getHardIrqCPU());
- assertEquals(0L, generateProfilingData.getSoftIrqCPU());
- assertEquals(0L, generateProfilingData.getIOWaitCPU());
- } catch (ProfilingException e) {
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java
deleted file mode 100644
index c3c75ce..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.profiling.types;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.ManagementTestUtils;
-import org.junit.Test;
-
-/**
- * This test checks the proper serialization and deserialization of profiling events.
- */
-public class ProfilingTypesTest {
-
- private static final int PROFILING_INTERVAL = 4321;
-
- private static final int IOWAIT_CPU = 10;
-
- private static final int IDLE_CPU = 11;
-
- private static final int USER_CPU = 12;
-
- private static final int SYSTEM_CPU = 13;
-
- private static final int HARD_IRQ_CPU = 14;
-
- private static final int SOFT_IRQ_CPU = 15;
-
- private static final long TOTAL_MEMORY = 10001L;
-
- private static final long FREE_MEMORY = 10002L;
-
- private static final long BUFFERED_MEMORY = 10003L;
-
- private static final long CACHED_MEMORY = 10004L;
-
- private static final long CACHED_SWAP_MEMORY = 10005L;
-
- private static final long RECEIVED_BYTES = 100006L;
-
- private static final long TRANSMITTED_BYTES = 100007L;
-
- private static final long TIMESTAMP = 100008L;
-
- private static final long PROFILING_TIMESTAMP = 100009L;
-
- private static final String INSTANCE_NAME = "Test Instance";
-
- private static final int USER_TIME = 17;
-
- private static final int SYSTEM_TIME = 18;
-
- private static final int BLOCKED_TIME = 19;
-
- private static final int WAITED_TIME = 20;
-
- /**
- * Tests serialization/deserialization for {@link InstanceSummaryProfilingEvent}.
- */
- @Test
- public void testInstanceSummaryProfilingEvent() {
-
- final InstanceSummaryProfilingEvent orig = new InstanceSummaryProfilingEvent(PROFILING_INTERVAL, IOWAIT_CPU,
- IDLE_CPU, USER_CPU, SYSTEM_CPU, HARD_IRQ_CPU, SOFT_IRQ_CPU, TOTAL_MEMORY, FREE_MEMORY, BUFFERED_MEMORY,
- CACHED_MEMORY, CACHED_SWAP_MEMORY, RECEIVED_BYTES, TRANSMITTED_BYTES, new JobID(), TIMESTAMP,
- PROFILING_TIMESTAMP);
-
- final InstanceSummaryProfilingEvent copy = (InstanceSummaryProfilingEvent) ManagementTestUtils.createCopy(orig);
-
- assertEquals(orig.getProfilingInterval(), copy.getProfilingInterval());
- assertEquals(orig.getIOWaitCPU(), copy.getIOWaitCPU());
- assertEquals(orig.getIdleCPU(), copy.getIdleCPU());
- assertEquals(orig.getUserCPU(), copy.getUserCPU());
- assertEquals(orig.getSystemCPU(), copy.getSystemCPU());
- assertEquals(orig.getHardIrqCPU(), copy.getHardIrqCPU());
- assertEquals(orig.getSoftIrqCPU(), copy.getSoftIrqCPU());
- assertEquals(orig.getTotalMemory(), copy.getTotalMemory());
- assertEquals(orig.getFreeMemory(), copy.getFreeMemory());
- assertEquals(orig.getBufferedMemory(), copy.getBufferedMemory());
- assertEquals(orig.getCachedMemory(), copy.getCachedMemory());
- assertEquals(orig.getCachedSwapMemory(), copy.getCachedSwapMemory());
- assertEquals(orig.getReceivedBytes(), copy.getReceivedBytes());
- assertEquals(orig.getTransmittedBytes(), copy.getTransmittedBytes());
- assertEquals(orig.getJobID(), copy.getJobID());
- assertEquals(orig.getTimestamp(), copy.getTimestamp());
- assertEquals(orig.getProfilingTimestamp(), copy.getProfilingTimestamp());
- assertEquals(orig.hashCode(), copy.hashCode());
- assertTrue(orig.equals(copy));
- }
-
- /**
- * Tests serialization/deserialization for {@link SingleInstanceProfilingEvent}.
- */
- @Test
- public void testSingleInstanceProfilingEvent() {
- try {
- final SingleInstanceProfilingEvent orig = new SingleInstanceProfilingEvent(PROFILING_INTERVAL, IOWAIT_CPU,
- IDLE_CPU, USER_CPU, SYSTEM_CPU, HARD_IRQ_CPU, SOFT_IRQ_CPU, TOTAL_MEMORY, FREE_MEMORY, BUFFERED_MEMORY,
- CACHED_MEMORY, CACHED_SWAP_MEMORY, RECEIVED_BYTES, TRANSMITTED_BYTES, new JobID(), TIMESTAMP,
- PROFILING_TIMESTAMP, INSTANCE_NAME);
-
- final SingleInstanceProfilingEvent copy = (SingleInstanceProfilingEvent) CommonTestUtils.createCopyWritable(orig);
-
- assertEquals(orig.getProfilingInterval(), copy.getProfilingInterval());
- assertEquals(orig.getIOWaitCPU(), copy.getIOWaitCPU());
- assertEquals(orig.getIdleCPU(), copy.getIdleCPU());
- assertEquals(orig.getUserCPU(), copy.getUserCPU());
- assertEquals(orig.getSystemCPU(), copy.getSystemCPU());
- assertEquals(orig.getHardIrqCPU(), copy.getHardIrqCPU());
- assertEquals(orig.getSoftIrqCPU(), copy.getSoftIrqCPU());
- assertEquals(orig.getTotalMemory(), copy.getTotalMemory());
- assertEquals(orig.getFreeMemory(), copy.getFreeMemory());
- assertEquals(orig.getBufferedMemory(), copy.getBufferedMemory());
- assertEquals(orig.getCachedMemory(), copy.getCachedMemory());
- assertEquals(orig.getCachedSwapMemory(), copy.getCachedSwapMemory());
- assertEquals(orig.getReceivedBytes(), copy.getReceivedBytes());
- assertEquals(orig.getTransmittedBytes(), copy.getTransmittedBytes());
- assertEquals(orig.getJobID(), copy.getJobID());
- assertEquals(orig.getTimestamp(), copy.getTimestamp());
- assertEquals(orig.getProfilingTimestamp(), copy.getProfilingTimestamp());
- assertEquals(orig.getInstanceName(), copy.getInstanceName());
- assertEquals(orig.hashCode(), copy.hashCode());
- assertTrue(orig.equals(copy));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- }
-
- /**
- * Tests serialization/deserialization for {@link ThreadProfilingEvent}.
- */
- @Test
- public void testThreadProfilingEvent() {
-
- final ThreadProfilingEvent orig = new ThreadProfilingEvent(USER_TIME, SYSTEM_TIME, BLOCKED_TIME, WAITED_TIME,
- new JobVertexID(), 17, new ExecutionAttemptID(), PROFILING_INTERVAL, new JobID(), TIMESTAMP, PROFILING_TIMESTAMP);
-
- final ThreadProfilingEvent copy = (ThreadProfilingEvent) ManagementTestUtils.createCopy(orig);
-
- assertEquals(orig.getUserTime(), copy.getUserTime());
- assertEquals(orig.getSystemTime(), copy.getSystemTime());
- assertEquals(orig.getBlockedTime(), copy.getBlockedTime());
- assertEquals(orig.getWaitedTime(), copy.getWaitedTime());
- assertEquals(orig.getVertexID(), copy.getVertexID());
- assertEquals(orig.getProfilingInterval(), copy.getProfilingInterval());
- assertEquals(orig.getJobID(), copy.getJobID());
- assertEquals(orig.getTimestamp(), copy.getTimestamp());
- assertEquals(orig.getProfilingTimestamp(), copy.getProfilingTimestamp());
- assertEquals(orig.hashCode(), copy.hashCode());
- assertTrue(orig.equals(copy));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index f87e151..9f9fe93 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -49,7 +49,7 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
override def startJobManager(actorSystem: ActorSystem): ActorRef = {
- val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, _,
+ val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager,
executionRetries, delayBetweenRetries,
timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
@@ -57,7 +57,7 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
- libraryCacheManager, archive, accumulatorManager, None, executionRetries,
+ libraryCacheManager, archive, accumulatorManager, executionRetries,
delayBetweenRetries, timeout) with TestingJobManager)
actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index ddfffee..05093b5 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -71,7 +71,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
override def startJobManager(actorSystem: ActorSystem): ActorRef = {
- val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, _,
+ val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager,
executionRetries, delayBetweenRetries,
timeout, archiveCount) = JobManager.createJobManagerComponents(configuration)
@@ -79,7 +79,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
- libraryCacheManager, archive, accumulatorManager, None, executionRetries,
+ libraryCacheManager, archive, accumulatorManager, executionRetries,
delayBetweenRetries, timeout) with TestingJobManager)
val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
http://git-wip-us.apache.org/repos/asf/flink/blob/fbea2da2/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 06e16dc..7884edd 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -32,7 +32,7 @@ import org.apache.flink.yarn.Messages.StartYarnSession
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.slf4j.LoggerFactory
+
import scala.io.Source
@@ -228,18 +228,14 @@ object ApplicationMaster {
// start all the components inside the job manager
LOG.debug("Starting JobManager components")
val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
- profilerProps, executionRetries, delayBetweenRetries,
+ executionRetries, delayBetweenRetries,
timeout, _) = JobManager.createJobManagerComponents(configuration)
- // start the profiler, if needed
- val profiler: Option[ActorRef] =
- profilerProps.map( props => jobManagerSystem.actorOf(props, JobManager.PROFILER_NAME) )
-
// start the archiver
val archiver: ActorRef = jobManagerSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler,
- libraryCacheManager, archiver, accumulatorManager, profiler, executionRetries,
+ libraryCacheManager, archiver, accumulatorManager, executionRetries,
delayBetweenRetries, timeout) with ApplicationMasterActor)
LOG.debug("Starting JobManager actor")