You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:31 UTC
[15/31] incubator-ignite git commit: # IGNITE-386: WIP on internal
namings (2).
# IGNITE-386: WIP on internal namings (2).
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/288709a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/288709a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/288709a1
Branch: refs/heads/ignite-386
Commit: 288709a1b48260e665278037e1beb050ab8ecdb4
Parents: ace354c
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Mar 3 15:55:58 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Mar 3 15:55:58 2015 +0300
----------------------------------------------------------------------
docs/core-site.ignite.xml | 2 +-
.../ignite/internal/IgniteComponentType.java | 2 +-
.../fs/IgniteHadoopFileSystemCounterWriter.java | 92 ++
.../processors/hadoop/HadoopContext.java | 2 +-
.../processors/hadoop/HadoopCounters.java | 14 +-
.../internal/processors/hadoop/HadoopImpl.java | 4 +-
.../processors/hadoop/HadoopProcessor.java | 225 ++++
.../hadoop/IgniteHadoopProcessor.java | 225 ----
.../counter/GridHadoopCounterAdapter.java | 128 ---
.../hadoop/counter/GridHadoopCountersImpl.java | 198 ----
.../counter/GridHadoopFSCounterWriter.java | 91 --
.../hadoop/counter/GridHadoopLongCounter.java | 92 --
.../counter/GridHadoopPerformanceCounter.java | 279 -----
.../hadoop/counter/HadoopCounterAdapter.java | 128 +++
.../hadoop/counter/HadoopCountersImpl.java | 198 ++++
.../hadoop/counter/HadoopLongCounter.java | 92 ++
.../counter/HadoopPerformanceCounter.java | 279 +++++
.../fs/GridHadoopDistributedFileSystem.java | 91 --
.../hadoop/fs/GridHadoopFileSystemsUtils.java | 57 -
.../hadoop/fs/GridHadoopLocalFileSystemV1.java | 39 -
.../hadoop/fs/GridHadoopLocalFileSystemV2.java | 86 --
.../hadoop/fs/GridHadoopRawLocalFileSystem.java | 304 ------
.../hadoop/fs/HadoopDistributedFileSystem.java | 91 ++
.../hadoop/fs/HadoopFileSystemsUtils.java | 57 +
.../hadoop/fs/HadoopLocalFileSystemV1.java | 39 +
.../hadoop/fs/HadoopLocalFileSystemV2.java | 86 ++
.../hadoop/fs/HadoopRawLocalFileSystem.java | 304 ++++++
.../jobtracker/GridHadoopJobMetadata.java | 305 ------
.../hadoop/jobtracker/HadoopJobMetadata.java | 305 ++++++
.../hadoop/jobtracker/HadoopJobTracker.java | 104 +-
.../hadoop/message/GridHadoopMessage.java | 27 -
.../hadoop/message/HadoopMessage.java | 27 +
.../planner/GridHadoopDefaultMapReducePlan.java | 107 --
.../GridHadoopDefaultMapReducePlanner.java | 434 --------
.../planner/HadoopDefaultMapReducePlan.java | 107 ++
.../planner/HadoopDefaultMapReducePlanner.java | 434 ++++++++
.../GridHadoopProtocolJobCountersTask.java | 45 -
.../proto/GridHadoopProtocolJobStatusTask.java | 81 --
.../proto/GridHadoopProtocolKillJobTask.java | 46 -
.../proto/GridHadoopProtocolNextTaskIdTask.java | 35 -
.../proto/GridHadoopProtocolSubmitJobTask.java | 57 -
.../proto/GridHadoopProtocolTaskAdapter.java | 113 --
.../proto/GridHadoopProtocolTaskArguments.java | 81 --
.../hadoop/proto/HadoopClientProtocol.java | 22 +-
.../proto/HadoopProtocolJobCountersTask.java | 45 +
.../proto/HadoopProtocolJobStatusTask.java | 81 ++
.../hadoop/proto/HadoopProtocolKillJobTask.java | 46 +
.../proto/HadoopProtocolNextTaskIdTask.java | 35 +
.../proto/HadoopProtocolSubmitJobTask.java | 57 +
.../hadoop/proto/HadoopProtocolTaskAdapter.java | 113 ++
.../proto/HadoopProtocolTaskArguments.java | 81 ++
.../hadoop/shuffle/GridHadoopShuffleAck.java | 91 --
.../hadoop/shuffle/GridHadoopShuffleJob.java | 593 -----------
.../shuffle/GridHadoopShuffleMessage.java | 242 -----
.../hadoop/shuffle/HadoopShuffle.java | 38 +-
.../hadoop/shuffle/HadoopShuffleAck.java | 91 ++
.../hadoop/shuffle/HadoopShuffleJob.java | 593 +++++++++++
.../hadoop/shuffle/HadoopShuffleMessage.java | 241 +++++
.../GridHadoopConcurrentHashMultimap.java | 611 -----------
.../collections/GridHadoopHashMultimap.java | 174 ---
.../collections/GridHadoopHashMultimapBase.java | 208 ----
.../shuffle/collections/GridHadoopMultimap.java | 112 --
.../collections/GridHadoopMultimapBase.java | 368 -------
.../shuffle/collections/GridHadoopSkipList.java | 726 -------------
.../HadoopConcurrentHashMultimap.java | 611 +++++++++++
.../shuffle/collections/HadoopHashMultimap.java | 174 +++
.../collections/HadoopHashMultimapBase.java | 208 ++++
.../shuffle/collections/HadoopMultimap.java | 112 ++
.../shuffle/collections/HadoopMultimapBase.java | 368 +++++++
.../shuffle/collections/HadoopSkipList.java | 726 +++++++++++++
.../shuffle/streams/GridHadoopDataInStream.java | 170 ---
.../streams/GridHadoopDataOutStream.java | 131 ---
.../streams/GridHadoopOffheapBuffer.java | 122 ---
.../shuffle/streams/HadoopDataInStream.java | 170 +++
.../shuffle/streams/HadoopDataOutStream.java | 131 +++
.../shuffle/streams/HadoopOffheapBuffer.java | 122 +++
.../taskexecutor/GridHadoopExecutorService.java | 232 ----
.../taskexecutor/GridHadoopRunnableTask.java | 22 +-
.../taskexecutor/GridHadoopTaskState.java | 38 -
.../taskexecutor/GridHadoopTaskStatus.java | 114 --
.../HadoopEmbeddedTaskExecutor.java | 8 +-
.../taskexecutor/HadoopExecutorService.java | 231 ++++
.../taskexecutor/HadoopTaskExecutorAdapter.java | 2 +-
.../hadoop/taskexecutor/HadoopTaskState.java | 38 +
.../hadoop/taskexecutor/HadoopTaskStatus.java | 114 ++
.../GridHadoopExternalTaskMetadata.java | 68 --
.../GridHadoopJobInfoUpdateRequest.java | 109 --
.../GridHadoopPrepareForJobRequest.java | 126 ---
.../external/GridHadoopProcessDescriptor.java | 150 ---
.../external/GridHadoopProcessStartedAck.java | 46 -
.../GridHadoopTaskExecutionRequest.java | 110 --
.../external/GridHadoopTaskFinishedMessage.java | 92 --
.../external/HadoopExternalTaskExecutor.java | 78 +-
.../external/HadoopExternalTaskMetadata.java | 68 ++
.../external/HadoopJobInfoUpdateRequest.java | 109 ++
.../external/HadoopPrepareForJobRequest.java | 126 +++
.../external/HadoopProcessDescriptor.java | 150 +++
.../external/HadoopProcessStartedAck.java | 46 +
.../external/HadoopTaskExecutionRequest.java | 110 ++
.../external/HadoopTaskFinishedMessage.java | 92 ++
.../child/GridHadoopChildProcessRunner.java | 72 +-
.../child/GridHadoopExternalProcessStarter.java | 2 +-
.../GridHadoopCommunicationClient.java | 2 +-
.../GridHadoopExternalCommunication.java | 52 +-
.../GridHadoopMarshallerFilter.java | 2 +-
.../GridHadoopMessageListener.java | 4 +-
.../GridHadoopTcpNioCommunicationClient.java | 2 +-
.../hadoop/v1/GridHadoopV1Counter.java | 4 +-
.../hadoop/v1/GridHadoopV1Reporter.java | 2 +-
.../hadoop/v2/GridHadoopV2Context.java | 2 +-
.../hadoop/v2/GridHadoopV2Counter.java | 4 +-
.../processors/hadoop/v2/GridHadoopV2Job.java | 2 +-
.../v2/GridHadoopV2JobResourceManager.java | 2 +-
.../hadoop/v2/GridHadoopV2TaskContext.java | 4 +-
.../hadoop/GridHadoopAbstractSelfTest.java | 2 +-
.../hadoop/GridHadoopCommandLineTest.java | 4 +-
...idHadoopDefaultMapReducePlannerSelfTest.java | 1005 ------------------
.../hadoop/GridHadoopFileSystemsTest.java | 6 +-
.../hadoop/GridHadoopMapReduceTest.java | 7 +-
.../GridHadoopTestRoundRobinMrPlanner.java | 2 +-
.../HadoopDefaultMapReducePlannerSelfTest.java | 1005 ++++++++++++++++++
...ridHadoopConcurrentHashMultimapSelftest.java | 12 +-
.../collections/GridHadoopHashMapSelfTest.java | 6 +-
.../collections/GridHadoopSkipListSelfTest.java | 14 +-
.../streams/GridHadoopDataStreamSelfTest.java | 4 +-
.../GridHadoopExecutorServiceTest.java | 119 ---
.../taskexecutor/HadoopExecutorServiceTest.java | 119 +++
...GridHadoopExternalCommunicationSelfTest.java | 6 +-
.../testsuites/IgniteHadoopTestSuite.java | 2 +-
129 files changed, 8937 insertions(+), 8937 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/docs/core-site.ignite.xml
----------------------------------------------------------------------
diff --git a/docs/core-site.ignite.xml b/docs/core-site.ignite.xml
index 1146576..8b8e634 100644
--- a/docs/core-site.ignite.xml
+++ b/docs/core-site.ignite.xml
@@ -73,7 +73,7 @@
<!--
<property>
<name>ignite.counters.writer</name>
- <value>org.apache.ignite.internal.processors.hadoop.counter.GridHadoopFSCounterWriter</value>
+ <value>org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter</value>
</property>
-->
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
index d0e487a..a51800e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
@@ -36,7 +36,7 @@ public enum IgniteComponentType {
/** Hadoop. */
HADOOP(
"org.apache.ignite.internal.processors.hadoop.IgniteHadoopNoopProcessor",
- "org.apache.ignite.internal.processors.hadoop.IgniteHadoopProcessor",
+ "org.apache.ignite.internal.processors.hadoop.HadoopProcessor",
"ignite-hadoop"
),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
new file mode 100644
index 0000000..449cff2
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Statistic writer implementation that writes info into any Hadoop file system.
+ */
+public class IgniteHadoopFileSystemCounterWriter implements GridHadoopCounterWriter {
+ /** */
+ public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
+
+ /** */
+ private static final String DEFAULT_USER_NAME = "anonymous";
+
+ /** */
+ public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
+
+ /** */
+ private static final String USER_MACRO = "${USER}";
+
+ /** */
+ private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO;
+
+ /** {@inheritDoc} */
+ @Override public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs)
+ throws IgniteCheckedException {
+
+ Configuration hadoopCfg = new Configuration();
+
+ for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
+ hadoopCfg.set(e.getKey(), e.getValue());
+
+ String user = jobInfo.user();
+
+ if (F.isEmpty(user))
+ user = DEFAULT_USER_NAME;
+
+ String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
+
+ if (dir == null)
+ dir = DEFAULT_COUNTER_WRITER_DIR;
+
+ Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString());
+
+ HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
+
+ try {
+ FileSystem fs = jobStatPath.getFileSystem(hadoopCfg);
+
+ fs.mkdirs(jobStatPath);
+
+ try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
+ for (T2<String, Long> evt : perfCntr.evts()) {
+ out.print(evt.get1());
+ out.print(':');
+ out.println(evt.get2().toString());
+ }
+
+ out.flush();
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
index bb707c8..d897b6c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
@@ -143,7 +143,7 @@ public class HadoopContext {
* @param meta Job metadata.
* @return {@code true} If local node is participating in job execution.
*/
- public boolean isParticipating(GridHadoopJobMetadata meta) {
+ public boolean isParticipating(HadoopJobMetadata meta) {
UUID locNodeId = localNodeId();
if (locNodeId.equals(meta.submitNodeId()))
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
index c7f0157..ad699ec 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
@@ -31,7 +31,7 @@ import java.util.*;
*/
public class HadoopCounters extends Counters {
/** */
- private final Map<T2<String,String>,GridHadoopLongCounter> cntrs = new HashMap<>();
+ private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>();
/**
* Creates new instance based on given counters.
@@ -40,8 +40,8 @@ public class HadoopCounters extends Counters {
*/
public HadoopCounters(GridHadoopCounters cntrs) {
for (GridHadoopCounter cntr : cntrs.all())
- if (cntr instanceof GridHadoopLongCounter)
- this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (GridHadoopLongCounter) cntr);
+ if (cntr instanceof HadoopLongCounter)
+ this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr);
}
/** {@inheritDoc} */
@@ -184,7 +184,7 @@ public class HadoopCounters extends Counters {
public Iterator<Counter> iterateGroup(String grpName) {
Collection<Counter> grpCounters = new ArrayList<>();
- for (GridHadoopLongCounter counter : cntrs.values()) {
+ for (HadoopLongCounter counter : cntrs.values()) {
if (grpName.equals(counter.group()))
grpCounters.add(new GridHadoopV2Counter(counter));
}
@@ -203,12 +203,12 @@ public class HadoopCounters extends Counters {
public Counter findCounter(String grpName, String cntrName, boolean create) {
T2<String, String> key = new T2<>(grpName, cntrName);
- GridHadoopLongCounter internalCntr = cntrs.get(key);
+ HadoopLongCounter internalCntr = cntrs.get(key);
if (internalCntr == null & create) {
- internalCntr = new GridHadoopLongCounter(grpName,cntrName);
+ internalCntr = new HadoopLongCounter(grpName,cntrName);
- cntrs.put(key, new GridHadoopLongCounter(grpName,cntrName));
+ cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
}
return internalCntr == null ? null : new GridHadoopV2Counter(internalCntr);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
index 80fd995..b87e7f8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
@@ -27,7 +27,7 @@ import org.jetbrains.annotations.*;
*/
public class HadoopImpl implements GridHadoop {
/** Hadoop processor. */
- private final IgniteHadoopProcessor proc;
+ private final HadoopProcessor proc;
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
@@ -37,7 +37,7 @@ public class HadoopImpl implements GridHadoop {
*
* @param proc Hadoop processor.
*/
- HadoopImpl(IgniteHadoopProcessor proc) {
+ HadoopImpl(HadoopProcessor proc) {
this.proc = proc;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
new file mode 100644
index 0000000..1f50b0c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.processors.hadoop.planner.*;
+import org.apache.ignite.internal.processors.hadoop.shuffle.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopClassLoader.*;
+
+/**
+ * Hadoop processor.
+ */
+public class HadoopProcessor extends IgniteHadoopProcessorAdapter {
+ /** Job ID counter. */
+ private final AtomicInteger idCtr = new AtomicInteger();
+
+ /** Hadoop context. */
+ @GridToStringExclude
+ private HadoopContext hctx;
+
+ /** Hadoop facade for public API. */
+ @GridToStringExclude
+ private GridHadoop hadoop;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public HadoopProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ if (ctx.isDaemon())
+ return;
+
+ GridHadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
+
+ if (cfg == null)
+ cfg = new GridHadoopConfiguration();
+ else
+ cfg = new GridHadoopConfiguration(cfg);
+
+ initializeDefaults(cfg);
+
+ validate(cfg);
+
+ if (hadoopHome() != null)
+ U.quietAndInfo(log, "HADOOP_HOME is set to " + hadoopHome());
+
+ boolean ok = false;
+
+ try { // Check for Hadoop installation.
+ hadoopUrls();
+
+ ok = true;
+ }
+ catch (IgniteCheckedException e) {
+ U.quietAndWarn(log, e.getMessage());
+ }
+
+ if (ok) {
+ hctx = new HadoopContext(
+ ctx,
+ cfg,
+ new HadoopJobTracker(),
+ cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(),
+ new HadoopShuffle());
+
+
+ for (HadoopComponent c : hctx.components())
+ c.start(hctx);
+
+ hadoop = new HadoopImpl(this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopProcessor.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ super.stop(cancel);
+
+ if (hctx == null)
+ return;
+
+ List<HadoopComponent> components = hctx.components();
+
+ for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
+ HadoopComponent c = it.previous();
+
+ c.stop(cancel);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
+
+ if (hctx == null)
+ return;
+
+ for (HadoopComponent c : hctx.components())
+ c.onKernalStart();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ super.onKernalStop(cancel);
+
+ if (hctx == null)
+ return;
+
+ List<HadoopComponent> components = hctx.components();
+
+ for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
+ HadoopComponent c = it.previous();
+
+ c.onKernalStop(cancel);
+ }
+ }
+
+ /**
+ * Gets Hadoop context.
+ *
+ * @return Hadoop context.
+ */
+ public HadoopContext context() {
+ return hctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridHadoop hadoop() {
+ if (hadoop == null)
+ throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " +
+ "is HADOOP_HOME environment variable set?)");
+
+ return hadoop;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridHadoopConfiguration config() {
+ return hctx.configuration();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridHadoopJobId nextJobId() {
+ return new GridHadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
+ return hctx.jobTracker().submit(jobId, jobInfo);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException {
+ return hctx.jobTracker().status(jobId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException {
+ return hctx.jobTracker().jobCounters(jobId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException {
+ return hctx.jobTracker().finishFuture(jobId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException {
+ return hctx.jobTracker().killJob(jobId);
+ }
+
+ /**
+ * Initializes default hadoop configuration.
+ *
+ * @param cfg Hadoop configuration.
+ */
+ private void initializeDefaults(GridHadoopConfiguration cfg) {
+ if (cfg.getMapReducePlanner() == null)
+ cfg.setMapReducePlanner(new HadoopDefaultMapReducePlanner());
+ }
+
+ /**
+ * Validates Grid and Hadoop configuration for correctness.
+ *
+ * @param hadoopCfg Hadoop configuration.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void validate(GridHadoopConfiguration hadoopCfg) throws IgniteCheckedException {
+ if (ctx.config().isPeerClassLoadingEnabled())
+ throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " +
+ "GridConfiguration.setPeerClassLoadingEnabled()).");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java
deleted file mode 100644
index 63e4854..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
-import org.apache.ignite.internal.processors.hadoop.planner.*;
-import org.apache.ignite.internal.processors.hadoop.shuffle.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopClassLoader.*;
-
-/**
- * Hadoop processor.
- */
-public class IgniteHadoopProcessor extends IgniteHadoopProcessorAdapter {
- /** Job ID counter. */
- private final AtomicInteger idCtr = new AtomicInteger();
-
- /** Hadoop context. */
- @GridToStringExclude
- private HadoopContext hctx;
-
- /** Hadoop facade for public API. */
- @GridToStringExclude
- private GridHadoop hadoop;
-
- /**
- * @param ctx Kernal context.
- */
- public IgniteHadoopProcessor(GridKernalContext ctx) {
- super(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- if (ctx.isDaemon())
- return;
-
- GridHadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
-
- if (cfg == null)
- cfg = new GridHadoopConfiguration();
- else
- cfg = new GridHadoopConfiguration(cfg);
-
- initializeDefaults(cfg);
-
- validate(cfg);
-
- if (hadoopHome() != null)
- U.quietAndInfo(log, "HADOOP_HOME is set to " + hadoopHome());
-
- boolean ok = false;
-
- try { // Check for Hadoop installation.
- hadoopUrls();
-
- ok = true;
- }
- catch (IgniteCheckedException e) {
- U.quietAndWarn(log, e.getMessage());
- }
-
- if (ok) {
- hctx = new HadoopContext(
- ctx,
- cfg,
- new HadoopJobTracker(),
- cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(),
- new HadoopShuffle());
-
-
- for (HadoopComponent c : hctx.components())
- c.start(hctx);
-
- hadoop = new HadoopImpl(this);
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgniteHadoopProcessor.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- super.stop(cancel);
-
- if (hctx == null)
- return;
-
- List<HadoopComponent> components = hctx.components();
-
- for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
- HadoopComponent c = it.previous();
-
- c.stop(cancel);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- super.onKernalStart();
-
- if (hctx == null)
- return;
-
- for (HadoopComponent c : hctx.components())
- c.onKernalStart();
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- super.onKernalStop(cancel);
-
- if (hctx == null)
- return;
-
- List<HadoopComponent> components = hctx.components();
-
- for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
- HadoopComponent c = it.previous();
-
- c.onKernalStop(cancel);
- }
- }
-
- /**
- * Gets Hadoop context.
- *
- * @return Hadoop context.
- */
- public HadoopContext context() {
- return hctx;
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoop hadoop() {
- if (hadoop == null)
- throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " +
- "is HADOOP_HOME environment variable set?)");
-
- return hadoop;
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopConfiguration config() {
- return hctx.configuration();
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopJobId nextJobId() {
- return new GridHadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
- return hctx.jobTracker().submit(jobId, jobInfo);
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException {
- return hctx.jobTracker().status(jobId);
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException {
- return hctx.jobTracker().jobCounters(jobId);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException {
- return hctx.jobTracker().finishFuture(jobId);
- }
-
- /** {@inheritDoc} */
- @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException {
- return hctx.jobTracker().killJob(jobId);
- }
-
- /**
- * Initializes default hadoop configuration.
- *
- * @param cfg Hadoop configuration.
- */
- private void initializeDefaults(GridHadoopConfiguration cfg) {
- if (cfg.getMapReducePlanner() == null)
- cfg.setMapReducePlanner(new GridHadoopDefaultMapReducePlanner());
- }
-
- /**
- * Validates Grid and Hadoop configuration for correctness.
- *
- * @param hadoopCfg Hadoop configuration.
- * @throws IgniteCheckedException If failed.
- */
- private void validate(GridHadoopConfiguration hadoopCfg) throws IgniteCheckedException {
- if (ctx.config().isPeerClassLoadingEnabled())
- throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " +
- "GridConfiguration.setPeerClassLoadingEnabled()).");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java
deleted file mode 100644
index 9e46846..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Default Hadoop counter implementation.
- */
-public abstract class GridHadoopCounterAdapter implements GridHadoopCounter, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Counter group name. */
- private String grp;
-
- /** Counter name. */
- private String name;
-
- /**
- * Default constructor required by {@link Externalizable}.
- */
- protected GridHadoopCounterAdapter() {
- // No-op.
- }
-
- /**
- * Creates new counter with given group and name.
- *
- * @param grp Counter group name.
- * @param name Counter name.
- */
- protected GridHadoopCounterAdapter(String grp, String name) {
- assert grp != null : "counter must have group";
- assert name != null : "counter must have name";
-
- this.grp = grp;
- this.name = name;
- }
-
- /** {@inheritDoc} */
- @Override public String name() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public String group() {
- return grp;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(grp);
- out.writeUTF(name);
- writeValue(out);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- grp = in.readUTF();
- name = in.readUTF();
- readValue(in);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- GridHadoopCounterAdapter cntr = (GridHadoopCounterAdapter)o;
-
- if (!grp.equals(cntr.grp))
- return false;
- if (!name.equals(cntr.name))
- return false;
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = grp.hashCode();
- res = 31 * res + name.hashCode();
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridHadoopCounterAdapter.class, this);
- }
-
- /**
- * Writes value of this counter to output.
- *
- * @param out Output.
- * @throws IOException If failed.
- */
- protected abstract void writeValue(ObjectOutput out) throws IOException;
-
- /**
- * Read value of this counter from input.
- *
- * @param in Input.
- * @throws IOException If failed.
- */
- protected abstract void readValue(ObjectInput in) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java
deleted file mode 100644
index 92d54af..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jdk8.backport.*;
-
-import java.io.*;
-import java.lang.reflect.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Default in-memory counters store.
- */
-public class GridHadoopCountersImpl implements GridHadoopCounters, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final ConcurrentMap<CounterKey, GridHadoopCounter> cntrsMap = new ConcurrentHashMap8<>();
-
- /**
- * Default constructor. Creates new instance without counters.
- */
- public GridHadoopCountersImpl() {
- // No-op.
- }
-
- /**
- * Creates new instance that contain given counters.
- *
- * @param cntrs Counters to store.
- */
- public GridHadoopCountersImpl(Iterable<GridHadoopCounter> cntrs) {
- addCounters(cntrs, true);
- }
-
- /**
- * Copy constructor.
- *
- * @param cntrs Counters to copy.
- */
- public GridHadoopCountersImpl(GridHadoopCounters cntrs) {
- this(cntrs.all());
- }
-
- /**
- * Creates counter instance.
- *
- * @param cls Class of the counter.
- * @param grp Group name.
- * @param name Counter name.
- * @return Counter.
- */
- private <T extends GridHadoopCounter> T createCounter(Class<? extends GridHadoopCounter> cls, String grp,
- String name) {
- try {
- Constructor constructor = cls.getConstructor(String.class, String.class);
-
- return (T)constructor.newInstance(grp, name);
- }
- catch (Exception e) {
- throw new IgniteException(e);
- }
- }
-
- /**
- * Adds counters collection in addition to existing counters.
- *
- * @param cntrs Counters to add.
- * @param cp Whether to copy counters or not.
- */
- private void addCounters(Iterable<GridHadoopCounter> cntrs, boolean cp) {
- assert cntrs != null;
-
- for (GridHadoopCounter cntr : cntrs) {
- if (cp) {
- GridHadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name());
-
- cntrCp.merge(cntr);
-
- cntr = cntrCp;
- }
-
- cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr);
- }
- }
-
- /** {@inheritDoc} */
- @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) {
- assert cls != null;
-
- CounterKey mapKey = new CounterKey(cls, grp, name);
-
- T cntr = (T)cntrsMap.get(mapKey);
-
- if (cntr == null) {
- cntr = createCounter(cls, grp, name);
-
- T old = (T)cntrsMap.putIfAbsent(mapKey, cntr);
-
- if (old != null)
- return old;
- }
-
- return cntr;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<GridHadoopCounter> all() {
- return cntrsMap.values();
- }
-
- /** {@inheritDoc} */
- @Override public void merge(GridHadoopCounters other) {
- for (GridHadoopCounter counter : other.all())
- counter(counter.group(), counter.name(), counter.getClass()).merge(counter);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeCollection(out, cntrsMap.values());
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- addCounters(U.<GridHadoopCounter>readCollection(in), false);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- GridHadoopCountersImpl counters = (GridHadoopCountersImpl)o;
-
- return cntrsMap.equals(counters.cntrsMap);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return cntrsMap.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridHadoopCountersImpl.class, this, "counters", cntrsMap.values());
- }
-
- /**
- * The tuple of counter identifier components for more readable code.
- */
- private static class CounterKey extends GridTuple3<Class<? extends GridHadoopCounter>, String, String> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Constructor.
- *
- * @param cls Class of the counter.
- * @param grp Group name.
- * @param name Counter name.
- */
- private CounterKey(Class<? extends GridHadoopCounter> cls, String grp, String name) {
- super(cls, grp, name);
- }
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public CounterKey() {
- // No-op.
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java
deleted file mode 100644
index d603d76..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Statistic writer implementation that writes info into any Hadoop file system.
- */
-public class GridHadoopFSCounterWriter implements GridHadoopCounterWriter {
- /** */
- public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
-
- /** */
- private static final String DEFAULT_USER_NAME = "anonymous";
-
- /** */
- public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
-
- /** */
- private static final String USER_MACRO = "${USER}";
-
- /** */
- private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO;
-
- /** {@inheritDoc} */
- @Override public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs)
- throws IgniteCheckedException {
-
- Configuration hadoopCfg = new Configuration();
-
- for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
- hadoopCfg.set(e.getKey(), e.getValue());
-
- String user = jobInfo.user();
-
- if (F.isEmpty(user))
- user = DEFAULT_USER_NAME;
-
- String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
-
- if (dir == null)
- dir = DEFAULT_COUNTER_WRITER_DIR;
-
- Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString());
-
- GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(cntrs, null);
-
- try {
- FileSystem fs = jobStatPath.getFileSystem(hadoopCfg);
-
- fs.mkdirs(jobStatPath);
-
- try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
- for (T2<String, Long> evt : perfCntr.evts()) {
- out.print(evt.get1());
- out.print(':');
- out.println(evt.get2().toString());
- }
-
- out.flush();
- }
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java
deleted file mode 100644
index 67af49f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-
-import java.io.*;
-
-/**
- * Standard hadoop counter to use via original Hadoop API in Hadoop jobs.
- */
-public class GridHadoopLongCounter extends GridHadoopCounterAdapter {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** The counter value. */
- private long val;
-
- /**
- * Default constructor required by {@link Externalizable}.
- */
- public GridHadoopLongCounter() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param grp Group name.
- * @param name Counter name.
- */
- public GridHadoopLongCounter(String grp, String name) {
- super(grp, name);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeValue(ObjectOutput out) throws IOException {
- out.writeLong(val);
- }
-
- /** {@inheritDoc} */
- @Override protected void readValue(ObjectInput in) throws IOException {
- val = in.readLong();
- }
-
- /** {@inheritDoc} */
- @Override public void merge(GridHadoopCounter cntr) {
- val += ((GridHadoopLongCounter)cntr).val;
- }
-
- /**
- * Gets current value of this counter.
- *
- * @return Current value.
- */
- public long value() {
- return val;
- }
-
- /**
- * Sets current value by the given value.
- *
- * @param val Value to set.
- */
- public void value(long val) {
- this.val = val;
- }
-
- /**
- * Increment this counter by the given value.
- *
- * @param i Value to increase this counter by.
- */
- public void increment(long i) {
- val += i;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java
deleted file mode 100644
index 263a075..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
-
-/**
- * Counter for the job statistics accumulation.
- */
-public class GridHadoopPerformanceCounter extends GridHadoopCounterAdapter {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** The group name for this counter. */
- private static final String GROUP_NAME = "SYSTEM";
-
- /** The counter name for this counter. */
- private static final String COUNTER_NAME = "PERFORMANCE";
-
- /** Events collections. */
- private Collection<T2<String,Long>> evts = new ArrayList<>();
-
- /** Node id to insert into the event info. */
- private UUID nodeId;
-
- /** */
- private int reducerNum;
-
- /** */
- private volatile Long firstShuffleMsg;
-
- /** */
- private volatile Long lastShuffleMsg;
-
- /**
- * Default constructor required by {@link Externalizable}.
- */
- public GridHadoopPerformanceCounter() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param grp Group name.
- * @param name Counter name.
- */
- public GridHadoopPerformanceCounter(String grp, String name) {
- super(grp, name);
- }
-
- /**
- * Constructor to create instance to use this as helper.
- *
- * @param nodeId Id of the work node.
- */
- public GridHadoopPerformanceCounter(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /** {@inheritDoc} */
- @Override protected void writeValue(ObjectOutput out) throws IOException {
- U.writeCollection(out, evts);
- }
-
- /** {@inheritDoc} */
- @Override protected void readValue(ObjectInput in) throws IOException {
- try {
- evts = U.readCollection(in);
- }
- catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void merge(GridHadoopCounter cntr) {
- evts.addAll(((GridHadoopPerformanceCounter)cntr).evts);
- }
-
- /**
- * Gets the events collection.
- *
- * @return Collection of event.
- */
- public Collection<T2<String, Long>> evts() {
- return evts;
- }
-
- /**
- * Generate name that consists of some event information.
- *
- * @param info Task info.
- * @param evtType The type of the event.
- * @return String contains necessary event information.
- */
- private String eventName(GridHadoopTaskInfo info, String evtType) {
- return eventName(info.type().toString(), info.taskNumber(), evtType);
- }
-
- /**
- * Generate name that consists of some event information.
- *
- * @param taskType Task type.
- * @param taskNum Number of the task.
- * @param evtType The type of the event.
- * @return String contains necessary event information.
- */
- private String eventName(String taskType, int taskNum, String evtType) {
- assert nodeId != null;
-
- return taskType + " " + taskNum + " " + evtType + " " + nodeId;
- }
-
- /**
- * Adds event of the task submission (task instance creation).
- *
- * @param info Task info.
- * @param ts Timestamp of the event.
- */
- public void onTaskSubmit(GridHadoopTaskInfo info, long ts) {
- evts.add(new T2<>(eventName(info, "submit"), ts));
- }
-
- /**
- * Adds event of the task preparation.
- *
- * @param info Task info.
- * @param ts Timestamp of the event.
- */
- public void onTaskPrepare(GridHadoopTaskInfo info, long ts) {
- evts.add(new T2<>(eventName(info, "prepare"), ts));
- }
-
- /**
- * Adds event of the task finish.
- *
- * @param info Task info.
- * @param ts Timestamp of the event.
- */
- public void onTaskFinish(GridHadoopTaskInfo info, long ts) {
- if (info.type() == GridHadoopTaskType.REDUCE && lastShuffleMsg != null) {
- evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg));
- evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg));
-
- lastShuffleMsg = null;
- }
-
- evts.add(new T2<>(eventName(info, "finish"), ts));
- }
-
- /**
- * Adds event of the task run.
- *
- * @param info Task info.
- * @param ts Timestamp of the event.
- */
- public void onTaskStart(GridHadoopTaskInfo info, long ts) {
- evts.add(new T2<>(eventName(info, "start"), ts));
- }
-
- /**
- * Adds event of the job preparation.
- *
- * @param ts Timestamp of the event.
- */
- public void onJobPrepare(long ts) {
- assert nodeId != null;
-
- evts.add(new T2<>("JOB prepare " + nodeId, ts));
- }
-
- /**
- * Adds event of the job start.
- *
- * @param ts Timestamp of the event.
- */
- public void onJobStart(long ts) {
- assert nodeId != null;
-
- evts.add(new T2<>("JOB start " + nodeId, ts));
- }
-
- /**
- * Adds client submission events from job info.
- *
- * @param info Job info.
- */
- public void clientSubmissionEvents(GridHadoopJobInfo info) {
- assert nodeId != null;
-
- addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY);
- addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY);
- addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY);
- }
-
- /**
- * Adds event with timestamp from some property in job info.
- *
- * @param evt Event type and phase.
- * @param info Job info.
- * @param propName Property name to get timestamp.
- */
- private void addEventFromProperty(String evt, GridHadoopJobInfo info, String propName) {
- String val = info.property(propName);
-
- if (!F.isEmpty(val)) {
- try {
- evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val)));
- }
- catch (NumberFormatException e) {
- throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e);
- }
- }
- }
-
- /**
- * Registers shuffle message event.
- *
- * @param reducerNum Number of reducer that receives the data.
- * @param ts Timestamp of the event.
- */
- public void onShuffleMessage(int reducerNum, long ts) {
- this.reducerNum = reducerNum;
-
- if (firstShuffleMsg == null)
- firstShuffleMsg = ts;
-
- lastShuffleMsg = ts;
- }
-
- /**
- * Gets system predefined performance counter from the GridHadoopCounters object.
- *
- * @param cntrs GridHadoopCounters object.
- * @param nodeId Node id for methods that adds events. It may be null if you don't use ones.
- * @return Predefined performance counter.
- */
- public static GridHadoopPerformanceCounter getCounter(GridHadoopCounters cntrs, @Nullable UUID nodeId) {
- GridHadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, GridHadoopPerformanceCounter.class);
-
- if (nodeId != null)
- cntr.nodeId(nodeId);
-
- return cntrs.counter(GROUP_NAME, COUNTER_NAME, GridHadoopPerformanceCounter.class);
- }
-
- /**
- * Sets the nodeId field.
- *
- * @param nodeId Node id.
- */
- private void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
new file mode 100644
index 0000000..3fdce14
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Default Hadoop counter implementation.
+ */
+public abstract class HadoopCounterAdapter implements GridHadoopCounter, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Counter group name. */
+ private String grp;
+
+ /** Counter name. */
+ private String name;
+
+ /**
+ * Default constructor required by {@link Externalizable}.
+ */
+ protected HadoopCounterAdapter() {
+ // No-op.
+ }
+
+ /**
+ * Creates new counter with given group and name.
+ *
+ * @param grp Counter group name.
+ * @param name Counter name.
+ */
+ protected HadoopCounterAdapter(String grp, String name) {
+ assert grp != null : "counter must have group";
+ assert name != null : "counter must have name";
+
+ this.grp = grp;
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public String group() {
+ return grp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(grp);
+ out.writeUTF(name);
+ writeValue(out);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ grp = in.readUTF();
+ name = in.readUTF();
+ readValue(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ HadoopCounterAdapter cntr = (HadoopCounterAdapter)o;
+
+ if (!grp.equals(cntr.grp))
+ return false;
+ if (!name.equals(cntr.name))
+ return false;
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = grp.hashCode();
+ res = 31 * res + name.hashCode();
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopCounterAdapter.class, this);
+ }
+
+ /**
+ * Writes value of this counter to output.
+ *
+ * @param out Output.
+ * @throws IOException If failed.
+ */
+ protected abstract void writeValue(ObjectOutput out) throws IOException;
+
+ /**
+ * Read value of this counter from input.
+ *
+ * @param in Input.
+ * @throws IOException If failed.
+ */
+ protected abstract void readValue(ObjectInput in) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
new file mode 100644
index 0000000..01b1473
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+
+import java.io.*;
+import java.lang.reflect.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Default in-memory counters store.
+ */
+public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final ConcurrentMap<CounterKey, GridHadoopCounter> cntrsMap = new ConcurrentHashMap8<>();
+
+ /**
+ * Default constructor. Creates new instance without counters.
+ */
+ public HadoopCountersImpl() {
+ // No-op.
+ }
+
+ /**
+ * Creates new instance that contain given counters.
+ *
+ * @param cntrs Counters to store.
+ */
+ public HadoopCountersImpl(Iterable<GridHadoopCounter> cntrs) {
+ addCounters(cntrs, true);
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param cntrs Counters to copy.
+ */
+ public HadoopCountersImpl(GridHadoopCounters cntrs) {
+ this(cntrs.all());
+ }
+
+ /**
+ * Creates counter instance.
+ *
+ * @param cls Class of the counter.
+ * @param grp Group name.
+ * @param name Counter name.
+ * @return Counter.
+ */
+ private <T extends GridHadoopCounter> T createCounter(Class<? extends GridHadoopCounter> cls, String grp,
+ String name) {
+ try {
+ Constructor constructor = cls.getConstructor(String.class, String.class);
+
+ return (T)constructor.newInstance(grp, name);
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Adds counters collection in addition to existing counters.
+ *
+ * @param cntrs Counters to add.
+ * @param cp Whether to copy counters or not.
+ */
+ private void addCounters(Iterable<GridHadoopCounter> cntrs, boolean cp) {
+ assert cntrs != null;
+
+ for (GridHadoopCounter cntr : cntrs) {
+ if (cp) {
+ GridHadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name());
+
+ cntrCp.merge(cntr);
+
+ cntr = cntrCp;
+ }
+
+ cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) {
+ assert cls != null;
+
+ CounterKey mapKey = new CounterKey(cls, grp, name);
+
+ T cntr = (T)cntrsMap.get(mapKey);
+
+ if (cntr == null) {
+ cntr = createCounter(cls, grp, name);
+
+ T old = (T)cntrsMap.putIfAbsent(mapKey, cntr);
+
+ if (old != null)
+ return old;
+ }
+
+ return cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<GridHadoopCounter> all() {
+ return cntrsMap.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void merge(GridHadoopCounters other) {
+ for (GridHadoopCounter counter : other.all())
+ counter(counter.group(), counter.name(), counter.getClass()).merge(counter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeCollection(out, cntrsMap.values());
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ addCounters(U.<GridHadoopCounter>readCollection(in), false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ HadoopCountersImpl counters = (HadoopCountersImpl)o;
+
+ return cntrsMap.equals(counters.cntrsMap);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return cntrsMap.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopCountersImpl.class, this, "counters", cntrsMap.values());
+ }
+
+ /**
+ * The tuple of counter identifier components for more readable code.
+ */
+ private static class CounterKey extends GridTuple3<Class<? extends GridHadoopCounter>, String, String> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructor.
+ *
+ * @param cls Class of the counter.
+ * @param grp Group name.
+ * @param name Counter name.
+ */
+ private CounterKey(Class<? extends GridHadoopCounter> cls, String grp, String name) {
+ super(cls, grp, name);
+ }
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public CounterKey() {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
new file mode 100644
index 0000000..1aa1e0e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+
+import java.io.*;
+
+/**
+ * Standard hadoop counter to use via original Hadoop API in Hadoop jobs.
+ */
+public class HadoopLongCounter extends HadoopCounterAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** The counter value. */
+ private long val;
+
+ /**
+ * Default constructor required by {@link Externalizable}.
+ */
+ public HadoopLongCounter() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param grp Group name.
+ * @param name Counter name.
+ */
+ public HadoopLongCounter(String grp, String name) {
+ super(grp, name);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeValue(ObjectOutput out) throws IOException {
+ out.writeLong(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readValue(ObjectInput in) throws IOException {
+ val = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void merge(GridHadoopCounter cntr) {
+ val += ((HadoopLongCounter)cntr).val;
+ }
+
+ /**
+ * Gets current value of this counter.
+ *
+ * @return Current value.
+ */
+ public long value() {
+ return val;
+ }
+
+ /**
+ * Sets current value by the given value.
+ *
+ * @param val Value to set.
+ */
+ public void value(long val) {
+ this.val = val;
+ }
+
+ /**
+ * Increment this counter by the given value.
+ *
+ * @param i Value to increase this counter by.
+ */
+ public void increment(long i) {
+ val += i;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
new file mode 100644
index 0000000..f22d0cd
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Counter for the job statistics accumulation.
+ */
+public class HadoopPerformanceCounter extends HadoopCounterAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** The group name for this counter. */
+ private static final String GROUP_NAME = "SYSTEM";
+
+ /** The counter name for this counter. */
+ private static final String COUNTER_NAME = "PERFORMANCE";
+
+ /** Events collections. */
+ private Collection<T2<String,Long>> evts = new ArrayList<>();
+
+ /** Node id to insert into the event info. */
+ private UUID nodeId;
+
+ /** */
+ private int reducerNum;
+
+ /** */
+ private volatile Long firstShuffleMsg;
+
+ /** */
+ private volatile Long lastShuffleMsg;
+
+ /**
+ * Default constructor required by {@link Externalizable}.
+ */
+ public HadoopPerformanceCounter() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param grp Group name.
+ * @param name Counter name.
+ */
+ public HadoopPerformanceCounter(String grp, String name) {
+ super(grp, name);
+ }
+
+ /**
+ * Constructor to create instance to use this as helper.
+ *
+ * @param nodeId Id of the work node.
+ */
+ public HadoopPerformanceCounter(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeValue(ObjectOutput out) throws IOException {
+ U.writeCollection(out, evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readValue(ObjectInput in) throws IOException {
+ try {
+ evts = U.readCollection(in);
+ }
+ catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void merge(GridHadoopCounter cntr) {
+ evts.addAll(((HadoopPerformanceCounter)cntr).evts);
+ }
+
+ /**
+ * Gets the events collection.
+ *
+ * @return Collection of event.
+ */
+ public Collection<T2<String, Long>> evts() {
+ return evts;
+ }
+
+ /**
+ * Generate name that consists of some event information.
+ *
+ * @param info Task info.
+ * @param evtType The type of the event.
+ * @return String contains necessary event information.
+ */
+ private String eventName(GridHadoopTaskInfo info, String evtType) {
+ return eventName(info.type().toString(), info.taskNumber(), evtType);
+ }
+
+ /**
+ * Generate name that consists of some event information.
+ *
+ * @param taskType Task type.
+ * @param taskNum Number of the task.
+ * @param evtType The type of the event.
+ * @return String contains necessary event information.
+ */
+ private String eventName(String taskType, int taskNum, String evtType) {
+ assert nodeId != null;
+
+ return taskType + " " + taskNum + " " + evtType + " " + nodeId;
+ }
+
+ /**
+ * Adds event of the task submission (task instance creation).
+ *
+ * @param info Task info.
+ * @param ts Timestamp of the event.
+ */
+ public void onTaskSubmit(GridHadoopTaskInfo info, long ts) {
+ evts.add(new T2<>(eventName(info, "submit"), ts));
+ }
+
+ /**
+ * Adds event of the task preparation.
+ *
+ * @param info Task info.
+ * @param ts Timestamp of the event.
+ */
+ public void onTaskPrepare(GridHadoopTaskInfo info, long ts) {
+ evts.add(new T2<>(eventName(info, "prepare"), ts));
+ }
+
+ /**
+ * Adds event of the task finish.
+ *
+ * @param info Task info.
+ * @param ts Timestamp of the event.
+ */
+ public void onTaskFinish(GridHadoopTaskInfo info, long ts) {
+ if (info.type() == GridHadoopTaskType.REDUCE && lastShuffleMsg != null) {
+ evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg));
+ evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg));
+
+ lastShuffleMsg = null;
+ }
+
+ evts.add(new T2<>(eventName(info, "finish"), ts));
+ }
+
+ /**
+ * Adds event of the task run.
+ *
+ * @param info Task info.
+ * @param ts Timestamp of the event.
+ */
+ public void onTaskStart(GridHadoopTaskInfo info, long ts) {
+ evts.add(new T2<>(eventName(info, "start"), ts));
+ }
+
+ /**
+ * Adds event of the job preparation.
+ *
+ * @param ts Timestamp of the event.
+ */
+ public void onJobPrepare(long ts) {
+ assert nodeId != null;
+
+ evts.add(new T2<>("JOB prepare " + nodeId, ts));
+ }
+
+ /**
+ * Adds event of the job start.
+ *
+ * @param ts Timestamp of the event.
+ */
+ public void onJobStart(long ts) {
+ assert nodeId != null;
+
+ evts.add(new T2<>("JOB start " + nodeId, ts));
+ }
+
+ /**
+ * Adds client submission events from job info.
+ *
+ * @param info Job info.
+ */
+ public void clientSubmissionEvents(GridHadoopJobInfo info) {
+ assert nodeId != null;
+
+ addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY);
+ addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY);
+ addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY);
+ }
+
+ /**
+ * Adds event with timestamp from some property in job info.
+ *
+ * @param evt Event type and phase.
+ * @param info Job info.
+ * @param propName Property name to get timestamp.
+ */
+ private void addEventFromProperty(String evt, GridHadoopJobInfo info, String propName) {
+ String val = info.property(propName);
+
+ if (!F.isEmpty(val)) {
+ try {
+ evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val)));
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e);
+ }
+ }
+ }
+
+ /**
+ * Registers shuffle message event.
+ *
+ * @param reducerNum Number of reducer that receives the data.
+ * @param ts Timestamp of the event.
+ */
+ public void onShuffleMessage(int reducerNum, long ts) {
+ this.reducerNum = reducerNum;
+
+ if (firstShuffleMsg == null)
+ firstShuffleMsg = ts;
+
+ lastShuffleMsg = ts;
+ }
+
+ /**
+ * Gets system predefined performance counter from the GridHadoopCounters object.
+ *
+ * @param cntrs GridHadoopCounters object.
+ * @param nodeId Node id for methods that adds events. It may be null if you don't use ones.
+ * @return Predefined performance counter.
+ */
+ public static HadoopPerformanceCounter getCounter(GridHadoopCounters cntrs, @Nullable UUID nodeId) {
+ HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
+
+ if (nodeId != null)
+ cntr.nodeId(nodeId);
+
+ return cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
+ }
+
+ /**
+ * Sets the nodeId field.
+ *
+ * @param nodeId Node id.
+ */
+ private void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java
deleted file mode 100644
index e9461e2..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.IgfsConfiguration.*;
-
-/**
- * Wrapper of HDFS for support of separated working directory.
- */
-public class GridHadoopDistributedFileSystem extends DistributedFileSystem {
- /** User name for each thread. */
- private final ThreadLocal<String> userName = new ThreadLocal<String>() {
- /** {@inheritDoc} */
- @Override protected String initialValue() {
- return DFLT_USER_NAME;
- }
- };
-
- /** Working directory for each thread. */
- private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
- /** {@inheritDoc} */
- @Override protected Path initialValue() {
- return getHomeDirectory();
- }
- };
-
- /** {@inheritDoc} */
- @Override public void initialize(URI uri, Configuration conf) throws IOException {
- super.initialize(uri, conf);
-
- setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
- }
-
- /**
- * Set user name and default working directory for current thread.
- *
- * @param userName User name.
- */
- public void setUser(String userName) {
- this.userName.set(userName);
-
- setWorkingDirectory(getHomeDirectory());
- }
-
- /** {@inheritDoc} */
- @Override public Path getHomeDirectory() {
- Path path = new Path("/user/" + userName.get());
-
- return path.makeQualified(getUri(), null);
- }
-
- /** {@inheritDoc} */
- @Override public void setWorkingDirectory(Path dir) {
- Path fixedDir = fixRelativePart(dir);
-
- String res = fixedDir.toUri().getPath();
-
- if (!DFSUtil.isValidName(res))
- throw new IllegalArgumentException("Invalid DFS directory name " + res);
-
- workingDir.set(fixedDir);
- }
-
- /** {@inheritDoc} */
- @Override public Path getWorkingDirectory() {
- return workingDir.get();
- }
-}