You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/06/19 09:47:01 UTC

[11/16] oozie git commit: OOZIE-2339 [fluent-job] Minimum Viable Fluent Job API (daniel.becker, andras.piros via rkanter, gezapeti, pbacsko)

http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Streaming.java
----------------------------------------------------------------------
diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Streaming.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Streaming.java
new file mode 100644
index 0000000..cfd8314
--- /dev/null
+++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Streaming.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.fluentjob.api.action;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.util.List;
+
+/**
+ * A class representing the streaming information within a {@link MapReduceAction}.
+ *
+ * Instances of this class should be built using the builder {@link StreamingBuilder}.
+ *
+ * The properties of the builder can only be set once, an attempt to set them a second time will trigger
+ * an {@link IllegalStateException}.
+ *
+ * Builder instances can be used to build several elements, although properties already set cannot be changed after
+ * a call to {@link StreamingBuilder#build} either.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Streaming {
+    private final String mapper;
+    private final String reducer;
+    private final String recordReader;
+    private final ImmutableList<String> recordReaderMappings;
+    private final ImmutableList<String> envs;
+
+    Streaming(final String mapper,
+              final String reducer,
+              final String recordReader,
+              final ImmutableList<String> recordReaderMappings,
+              final ImmutableList<String> envs) {
+        this.mapper = mapper;
+        this.reducer = reducer;
+        this.recordReader = recordReader;
+        this.recordReaderMappings = recordReaderMappings;
+        this.envs = envs;
+    }
+
+    /**
+     * Returns the mapper of this {@link Streaming} object.
+     * @return The mapper of this {@link Streaming} object.
+     */
+    public String getMapper() {
+        return mapper;
+    }
+
+    /**
+     * Returns the reducer of this {@link Streaming} object.
+     * @return The reducer of this {@link Streaming} object.
+     */
+    public String getReducer() {
+        return reducer;
+    }
+
+    /**
+     * Returns the record reader of this {@link Streaming} object.
+     * @return The record reader of this {@link Streaming} object.
+     */
+    public String getRecordReader() {
+        return recordReader;
+    }
+
+    /**
+     * Returns the record reader mappings of this {@link Streaming} object as a list.
+     * @return The record reader mappings of this {@link Streaming} object as a list.
+     */
+    public List<String> getRecordReaderMappings() {
+        return recordReaderMappings;
+    }
+
+    /**
+     * Returns the environment variables of this {@link Streaming} object as a list.
+     * @return The environment variables of this {@link Streaming} object as a list.
+     */
+    public List<String> getEnvs() {
+        return envs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/StreamingBuilder.java
----------------------------------------------------------------------
diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/StreamingBuilder.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/StreamingBuilder.java
new file mode 100644
index 0000000..ab52969
--- /dev/null
+++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/StreamingBuilder.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.fluentjob.api.action;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.oozie.fluentjob.api.ModifyOnce;
+
+/**
+ * A builder class for {@link Streaming}.
+ *
+ * The properties of the builder can only be set once, an attempt to set them a second time will trigger
+ * an {@link IllegalStateException}. The properties that are lists are an exception to this rule, of course multiple
+ * elements can be added / removed.
+ *
+ * Builder instances can be used to build several elements, although properties already set cannot be changed after
+ * a call to {@link StreamingBuilder#build} either.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class StreamingBuilder implements Builder<Streaming> {
+    private final ModifyOnce<String> mapper;
+    private final ModifyOnce<String> reducer;
+    private final ModifyOnce<String> recordReader;
+    private final ImmutableList.Builder<String> recordReaderMappings;
+    private final ImmutableList.Builder<String> envs;
+
+    /**
+     * Creates a new {@link StreamingBuilder}.
+     */
+    public StreamingBuilder() {
+        mapper = new ModifyOnce<>();
+        reducer = new ModifyOnce<>();
+        recordReader = new ModifyOnce<>();
+
+        recordReaderMappings = new ImmutableList.Builder<>();
+        envs = new ImmutableList.Builder<>();
+    }
+
+    /**
+     * Registers a mapper with this builder.
+     * @param mapper The mapper to register with this builder.
+     * @return This builder.
+     */
+    public StreamingBuilder withMapper(final String mapper) {
+        this.mapper.set(mapper);
+        return this;
+    }
+
+    /**
+     * Registers a reducer with this builder.
+     * @param reducer The reducer to register with this builder.
+     * @return This builder.
+     */
+    public StreamingBuilder withReducer(final String reducer) {
+        this.reducer.set(reducer);
+        return this;
+    }
+
+    /**
+     * Registers a record reader with this builder.
+     * @param recordReader The record reader to register with this builder.
+     * @return This builder.
+     */
+    public StreamingBuilder withRecordReader(final String recordReader) {
+        this.recordReader.set(recordReader);
+        return this;
+    }
+
+    /**
+     * Registers a record reader mapping with this builder.
+     * @param recordReaderMapping The record reader mapping to register with this builder.
+     * @return This builder.
+     */
+    public StreamingBuilder withRecordReaderMapping(final String recordReaderMapping) {
+        this.recordReaderMappings.add(recordReaderMapping);
+        return this;
+    }
+
+    /**
+     * Registers an environment variable with this builder.
+     * @param env The environment variable to register with this builder.
+     * @return This builder.
+     */
+    public StreamingBuilder withEnv(final String env) {
+        this.envs.add(env);
+        return this;
+    }
+
+    /**
+     * Creates a new {@link Streaming} object with the properties stores in this builder.
+     * The new {@link Streaming} object is independent of this builder and the builder can be used to build
+     * new instances.
+     * @return A new {@link Streaming} object with the properties stored in this builder.
+     */
+    @Override
+    public Streaming build() {
+        return new Streaming(mapper.get(), reducer.get(), recordReader.get(), recordReaderMappings.build(), envs.build());
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowAction.java
----------------------------------------------------------------------
diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowAction.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowAction.java
new file mode 100644
index 0000000..238bc13
--- /dev/null
+++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowAction.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.fluentjob.api.action;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * A class representing the Oozie subworkflow action.
+ * Instances of this class should be built using the builder {@link SubWorkflowActionBuilder}.
+ *
+ * The properties of the builder can only be set once, an attempt to set them a second time will trigger
+ * an {@link IllegalStateException}.
+ *
+ * Builder instances can be used to build several elements, although properties already set cannot be changed after
+ * a call to {@link SubWorkflowActionBuilder#build} either.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SubWorkflowAction extends Node {
+    private final String appPath;
+    private final boolean propagateConfiguration;
+    private final ImmutableMap<String, String> configuration;
+
+    SubWorkflowAction(final Node.ConstructionData constructionData,
+                      final String appPath,
+                      final boolean propagateConfiguration,
+                      final ImmutableMap<String, String> configuration) {
+        super(constructionData);
+
+        this.appPath = appPath;
+        this.propagateConfiguration = propagateConfiguration;
+        this.configuration = configuration;
+    }
+
+    /**
+     * Returns the path to the application definition (usually workflow.xml) of the subworkflow.
+     * @return The path to the application definition (usually workflow.xml) of the subworkflow.
+     */
+    public String getAppPath() {
+        return appPath;
+    }
+
+    /**
+     * Returns whether the configuration of the main workflow should propagate to the subworkflow.
+     * @return {@code true} if the configuration of the main workflow should propagate to the subworkflow;
+     *         {@code false} otherwise.
+     */
+    public boolean isPropagatingConfiguration() {
+        return propagateConfiguration;
+    }
+
+    /**
+     * Returns the value associated with the provided configuration property name.
+     * @param property The name of the configuration property for which the value will be returned.
+     * @return The value associated with the provided configuration property name.
+     */
+    public String getConfigProperty(final String property) {
+        return configuration.get(property);
+    }
+
+    /**
+     * Returns an immutable map of the configuration key-value pairs stored in this {@link MapReduceAction} object.
+     * @return An immutable map of the configuration key-value pairs stored in this {@link MapReduceAction} object.
+     */
+    public Map<String, String> getConfiguration() {
+        return configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowActionBuilder.java
----------------------------------------------------------------------
diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowActionBuilder.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowActionBuilder.java
new file mode 100644
index 0000000..07761d4
--- /dev/null
+++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowActionBuilder.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.fluentjob.api.action;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.oozie.fluentjob.api.ModifyOnce;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A builder class for {@link SubWorkflowAction}.
+ *
+ * The properties of the builder can only be set once, an attempt to set them a second time will trigger
+ * an {@link IllegalStateException}. The properties that are lists are an exception to this rule, of course multiple
+ * elements can be added / removed.
+ *
+ * Builder instances can be used to build several elements, although properties already set cannot be changed after
+ * a call to {@link SubWorkflowActionBuilder#build} either.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SubWorkflowActionBuilder
+        extends NodeBuilderBaseImpl<SubWorkflowActionBuilder> implements Builder<SubWorkflowAction> {
+    private final ModifyOnce<String> appPath;
+    private final ModifyOnce<Boolean> propagateConfiguration;
+    private final Map<String, ModifyOnce<String>> configuration;
+
+    /**
+     * Creates and returns an empty builder.
+     * @return An empty builder.
+     */
+    public static SubWorkflowActionBuilder create() {
+        final ModifyOnce<String> appPath = new ModifyOnce<>();
+        final ModifyOnce<Boolean> propagateConfiguration = new ModifyOnce<>(false);
+        final Map<String, ModifyOnce<String>> configuration = new LinkedHashMap<>();
+
+        return new SubWorkflowActionBuilder(null, appPath, propagateConfiguration, configuration);
+    }
+
+    /**
+     * Create and return a new {@link SubWorkflowActionBuilder} that is based on an already built
+     * {@link SubWorkflowAction} object. The properties of the builder will initially be the same as those of the
+     * provided {@link SubWorkflowAction} object, but it is possible to modify them once.
+     * @param action The {@link SubWorkflowAction} object on which this {@link SubWorkflowActionBuilder} will be based.
+     * @return A new {@link SubWorkflowActionBuilder} that is based on a previously built {@link SubWorkflowAction} object.
+     */
+    public static SubWorkflowActionBuilder createFromExistingAction(final SubWorkflowAction action) {
+        final ModifyOnce<String> appPath = new ModifyOnce<>(action.getAppPath());
+        final ModifyOnce<Boolean> propagateConfiguration = new ModifyOnce<>(action.isPropagatingConfiguration());
+        final Map<String, ModifyOnce<String>> configuration =
+                ActionAttributesBuilder.convertToModifyOnceMap(action.getConfiguration());
+
+        return new SubWorkflowActionBuilder(action, appPath, propagateConfiguration, configuration);
+    }
+
+    SubWorkflowActionBuilder(final SubWorkflowAction action,
+                             final ModifyOnce<String> appPath,
+                             final ModifyOnce<Boolean> propagateConfiguration,
+                             final Map<String, ModifyOnce<String>> configuration) {
+        super(action);
+
+        this.appPath = appPath;
+        this.propagateConfiguration = propagateConfiguration;
+        this.configuration = configuration;
+    }
+
+    /**
+     * Registers the path to the application definition (usually workflow.xml) of the subworkflow.
+     * @param appPath HDFS application path
+     * @return This builder.
+     */
+    public SubWorkflowActionBuilder withAppPath(final String appPath) {
+        this.appPath.set(appPath);
+        return this;
+    }
+
+    /**
+     * Registers that the configuration of the main workflow should propagate to the subworkflow.
+     * @return This builder.
+     */
+    public SubWorkflowActionBuilder withPropagatingConfiguration() {
+        this.propagateConfiguration.set(true);
+        return this;
+    }
+
+    /**
+     * Registers that the configuration of the main workflow should NOT propagate to the subworkflow.
+     * @return This builder.
+     */
+    public SubWorkflowActionBuilder withoutPropagatingConfiguration() {
+        this.propagateConfiguration.set(false);
+        return this;
+    }
+
+    /**
+     * Registers a configuration property (a key-value pair) with this builder. If the provided key has already been
+     * set on this builder, an exception is thrown. Setting a key to null means deleting it.
+     * @param key The name of the property to set.
+     * @param value The value of the property to set.
+     * @return this
+     * @throws IllegalStateException if the provided key has already been set on this builder.
+     */
+    public SubWorkflowActionBuilder withConfigProperty(final String key, final String value) {
+        ModifyOnce<String> mappedValue = this.configuration.get(key);
+
+        if (mappedValue == null) {
+            mappedValue = new ModifyOnce<>(value);
+            this.configuration.put(key, mappedValue);
+        }
+
+        mappedValue.set(value);
+
+        return this;
+    }
+
+    /**
+     * Creates a new {@link SubWorkflowAction} object with the properties stores in this builder.
+     * The new {@link MapReduceAction} object is independent of this builder and the builder can be used to build
+     * new instances.
+     * @return A new {@link MapReduceAction} object with the properties stored in this builder.
+     */
+    @Override
+    public SubWorkflowAction build() {
+        final Node.ConstructionData constructionData = getConstructionData();
+
+        final SubWorkflowAction instance = new SubWorkflowAction(
+                constructionData,
+                appPath.get(),
+                propagateConfiguration.get(),
+                ActionAttributesBuilder.convertToConfigurationMap(configuration));
+
+        addAsChildToAllParents(instance);
+
+        return instance;
+    }
+
+    @Override
+    protected SubWorkflowActionBuilder getRuntimeSelfReference() {
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Touchz.java
----------------------------------------------------------------------
diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Touchz.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Touchz.java
new file mode 100644
index 0000000..11c655a
--- /dev/null
+++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Touchz.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.fluentjob.api.action;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A class representing the touchz command of {@link FSAction}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Touchz {
+    private final String path;
+
+    /**
+     * Creates a new {@link Touchz} object.
+     * @param path The path of the file that the touch operation will be executed on.
+     */
+    public Touchz(final String path) {
+        this.path = path;
+    }
+
+    /**
+     * Returns the path of the file that the touch operation will be executed on.
+     * @return The path of the file that the touch operation will be executed on.
+     */
+    public String getPath() {
+        return path;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DagNodeWithCondition.java
----------------------------------------------------------------------
diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DagNodeWithCondition.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DagNodeWithCondition.java
new file mode 100644
index 0000000..a0ef83a
--- /dev/null
+++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DagNodeWithCondition.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.fluentjob.api.dag;
+
+import org.apache.oozie.fluentjob.api.Condition;
+
+import java.util.Collection;
+
+/**
+ * This is a class bundling together a {@link NodeBase} object and a {@link Condition} object. There are no restrictions
+ * as to whether the node should be a parent with an outgoing conditional path or a child with an incoming conditional path.
+ */
+public class DagNodeWithCondition {
+    private final NodeBase node;
+    private final Condition condition;
+
+    /**
+     * Removes the first {@link DagNodeWithCondition} object from a collection that has the provided node as its node.
+     * If there is no such object in the collection, this method returns false;
+     * @param collection The collection from which to remove an element
+     * @param node The node to remove together with its condition.
+     * @return {@code true} if an element was removed; {@code false} if no matching element was contained in the collection.
+     */
+    public static boolean removeFromCollection(final Collection<DagNodeWithCondition> collection, final NodeBase node) {
+        DagNodeWithCondition element = null;
+        for (final DagNodeWithCondition nodeWithCondition : collection) {
+            if (node.equals(nodeWithCondition.getNode())) {
+                element = nodeWithCondition;
+            }
+        }
+
+        if (element != null) {
+            collection.remove(element);
+        }
+
+        return element != null;
+    }
+
+    /**
+     * Creates a new {@link DagNodeWithCondition} object.
+     * @param node A {@link NodeBase} object.
+     * @param condition A {@link Condition} object.
+     */
+    public DagNodeWithCondition(final NodeBase node,
+                                final Condition condition) {
+        this.node = node;
+        this.condition = condition;
+    }
+
+    /**
+     * Returns the {@link NodeBase} object of this {@link DagNodeWithCondition}.
+     * @return The {@link NodeBase} object of this {@link DagNodeWithCondition}.
+     */
+    public NodeBase getNode() {
+        return node;
+    }
+
+    /**
+     * Returns the {@link Condition} object of this {@link DagNodeWithCondition}.
+     * @return The {@link Condition} object of this {@link DagNodeWithCondition}.
+     */
+    public Condition getCondition() {
+        return condition;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        final DagNodeWithCondition that = (DagNodeWithCondition) o;
+
+        if (node != null ? !node.equals(that.node) : that.node != null) {
+            return false;
+        }
+
+        return condition != null ? condition.equals(that.condition) : that.condition == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = node != null ? node.hashCode() : 0;
+        result = 31 * result + (condition != null ? condition.hashCode() : 0);
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Decision.java
----------------------------------------------------------------------
diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Decision.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Decision.java
new file mode 100644
index 0000000..1b1a742
--- /dev/null
+++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Decision.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.fluentjob.api.dag;
+
+import com.google.common.base.Preconditions;
+import org.apache.oozie.fluentjob.api.Condition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A class representing decision nodes in an Oozie workflow definition DAG. These nodes are generated automatically,
+ * the end user should not need to use this class directly.
+ */
+public class Decision extends NodeBase {
+    private NodeBase parent;
+    private final List<DagNodeWithCondition> childrenWithConditions;
+    private NodeBase defaultChild;
+
+    /**
+     * Create a new decision node with the given name.
+     * @param name The name of the new decision node.
+     */
+    public Decision(final String name) {
+        super(name);
+        this.parent = null;
+        this.childrenWithConditions = new ArrayList<>();
+    }
+
+    /**
+     * Returns the parent of this node.
+     * @return The parent of this node.
+     */
+    public NodeBase getParent() {
+        return parent;
+    }
+
+    /**
+     * Adds the provided node as a parent of this node.
+     * @param parent The new parent of this node.
+     * @throws IllegalStateException if this node already has a parent.
+     */
+    @Override
+    public void addParent(final NodeBase parent) {
+        Preconditions.checkState(this.parent == null, "Decision nodes cannot have multiple parents.");
+
+        this.parent = parent;
+        this.parent.addChild(this);
+    }
+
+    /**
+     * Adds the provided node as a conditional parent of this node.
+     * @param parent The new conditional parent of this node.
+     * @param condition The condition which must be true in addition the parent completing successfully for this node
+     *                  to be executed.
+     * @throws IllegalStateException if this node already has a parent.
+     */
+    @Override
+    public void addParentWithCondition(final Decision parent, final Condition condition) {
+        Preconditions.checkState(this.parent == null, "Decision nodes cannot have multiple parents.");
+
+        this.parent = parent;
+        parent.addChildWithCondition(this, condition);
+    }
+
+    /**
+     * Adds the provided node as the default conditional parent of this node.
+     * @param parent The new conditional parent of this node.
+     * @throws IllegalStateException if this node already has a parent.
+     */
+    @Override
+    public void addParentDefaultConditional(final Decision parent) {
+        Preconditions.checkState(this.parent == null, "Decision nodes cannot have multiple parents.");
+
+        this.parent = parent;
+        parent.addDefaultChild(this);
+    }
+
+    @Override
+    public void removeParent(final NodeBase parent) {
+        Preconditions.checkArgument(this.parent == parent, "Trying to remove a nonexistent parent.");
+
+        if (this.parent != null) {
+            this.parent.removeChild(this);
+        }
+
+        this.parent = null;
+    }
+
+    @Override
+    public void clearParents() {
+        removeParent(parent);
+    }
+
+    @Override
+    public List<NodeBase> getChildren() {
+        final List<NodeBase> results = new ArrayList<>();
+
+        for (final DagNodeWithCondition nodeWithCondition : getChildrenWithConditions()) {
+            results.add(nodeWithCondition.getNode());
+        }
+
+        return Collections.unmodifiableList(results);
+    }
+
+    /**
+     * Returns the children of this {@link Decision} node together with their conditions (all children are conditional),
+     * including the default child.
+     * @return The conditional children of this {@link Decision} node together with their conditions,
+     *         including the default child.
+     */
+    public List<DagNodeWithCondition> getChildrenWithConditions() {
+        final List<DagNodeWithCondition> results = new ArrayList<>(childrenWithConditions);
+
+        if (defaultChild != null) {
+            results.add(new DagNodeWithCondition(defaultChild, Condition.defaultCondition()));
+        }
+
+        return Collections.unmodifiableList(results);
+    }
+
+    /**
+     * Returns the default child of this {@code Decision} node.
+     * @return The default child of this {@code Decision} node.
+     */
+    public NodeBase getDefaultChild() {
+        return defaultChild;
+    }
+
+    @Override
+    protected void addChild(final NodeBase child) {
+        throw new IllegalStateException("Decision nodes cannot have normal children.");
+    }
+
+    void addChildWithCondition(final NodeBase child, final Condition condition) {
+        if (condition.isDefault()) {
+            addDefaultChild(child);
+        }
+        else {
+            this.childrenWithConditions.add(new DagNodeWithCondition(child, condition));
+        }
+    }
+
+    void addDefaultChild(final NodeBase child) {
+        Preconditions.checkState(defaultChild == null, "Trying to add a default child to a Decision node that already has one.");
+
+        defaultChild = child;
+    }
+
+    @Override
+    protected void removeChild(final NodeBase child) {
+        if (defaultChild == child) {
+            defaultChild = null;
+        }
+        else {
+            final int index = indexOfNodeBaseInChildrenWithConditions(child);
+
+            Preconditions.checkArgument(index >= 0, "Trying to remove a nonexistent child.");
+
+            this.childrenWithConditions.remove(index);
+        }
+    }
+
+    private int indexOfNodeBaseInChildrenWithConditions(final NodeBase child) {
+        for (int i = 0; i < this.childrenWithConditions.size(); ++i) {
+            if (child == this.childrenWithConditions.get(i).getNode()) {
+                return i;
+            }
+        }
+
+        return -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DecisionJoin.java
----------------------------------------------------------------------
diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DecisionJoin.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DecisionJoin.java
new file mode 100644
index 0000000..c96753e
--- /dev/null
+++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DecisionJoin.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.fluentjob.api.dag;
+
+/**
+ * This class represents a joining point where two or more (but not necessarily all) conditional branches originating
+ * from the same decision node meet.
+ * This class will NOT be mapped to JAXB classes and XML as decision nodes don't need to be joined is Oozie, this class
+ * only exists to make implementing the algorithms easier.
+ */
+public class DecisionJoin extends JoiningNodeBase<Decision> {
+
+    /**
+     * Creates a new {@link DecisionJoin} object.
+     * @param name The name of the new decision object.
+     * @param decision The {@link Decision} node that this {@link DecisionJoin} node closes.
+     */
+    public DecisionJoin(final String name, final Decision decision) {
+        super(name, decision);
+    }
+
+    public NodeBase getFirstNonDecisionJoinDescendant() {
+        NodeBase descendant = getChild();
+
+        while (descendant != null) {
+            if (!(descendant instanceof DecisionJoin)) {
+                return descendant;
+            }
+
+            descendant = ((DecisionJoin) descendant).getChild();
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/End.java
----------------------------------------------------------------------
diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/End.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/End.java
new file mode 100644
index 0000000..0a28eb3
--- /dev/null
+++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/End.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.fluentjob.api.dag;
+
+import com.google.common.base.Preconditions;
+import org.apache.oozie.fluentjob.api.Condition;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A class representing end nodes in an Oozie workflow definition DAG. These nodes are generated automatically,
+ * the end user should not need to use this class directly.
+ */
+public class End extends NodeBase {
+    private NodeBase parent;
+
+    /**
+     * Create a new end node with the given name.
+     * @param name The name of the new end node.
+     */
+    public End(final String name) {
+        super(name);
+    }
+
+    /**
+     * Returns the parent of this node.
+     * @return The parent of this node.
+     */
+    public NodeBase getParent() {
+        return parent;
+    }
+
+    /**
+     * Adds the provided node as a parent of this node.
+     * @param parent The new parent of this node.
+     * @throws IllegalStateException if this node already has a parent.
+     */
+    @Override
+    public void addParent(final NodeBase parent) {
+        Preconditions.checkState(this.parent == null, "End nodes cannot have multiple parents.");
+
+        this.parent = parent;
+        parent.addChild(this);
+    }
+
+    /**
+     * Adds the provided node as a conditional parent of this node.
+     * @param parent The new conditional parent of this node.
+     * @param condition The condition which must be true in addition the parent completing successfully for this node
+     *                  to be executed.
+     * @throws IllegalStateException if this node already has a parent.
+     */
+    @Override
+    public void addParentWithCondition(final Decision parent, final Condition condition) {
+        Preconditions.checkState(this.parent == null, "End nodes cannot have multiple parents.");
+
+        this.parent = parent;
+        parent.addChildWithCondition(this, condition);
+    }
+
+    /**
+     * Adds the provided node as the default conditional parent of this node.
+     * @param parent The new conditional parent of this node.
+     * @throws IllegalStateException if this node already has a parent.
+     */
+    @Override
+    public void addParentDefaultConditional(Decision parent) {
+        Preconditions.checkState(this.parent == null, "End nodes cannot have multiple parents.");
+
+        this.parent = parent;
+        parent.addDefaultChild(this);
+    }
+
+    @Override
+    public void removeParent(final NodeBase parent) {
+        Preconditions.checkArgument(this.parent == parent, "Trying to remove a nonexistent parent.");
+
+        if (this.parent != null) {
+            this.parent.removeChild(this);
+        }
+
+        this.parent = null;
+    }
+
+    @Override
+    public void clearParents() {
+        removeParent(parent);
+    }
+
+    @Override
+    public List<NodeBase> getChildren() {
+        return Arrays.asList();
+    }
+
+    @Override
+    protected void addChild(final NodeBase child) {
+        throw new IllegalStateException("End nodes cannot have children.");
+    }
+
+    @Override
+    protected void removeChild(final NodeBase child) {
+        throw new IllegalStateException("End nodes cannot have children.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/ExplicitNode.java
----------------------------------------------------------------------
diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/ExplicitNode.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/ExplicitNode.java
new file mode 100644
index 0000000..ae16a53
--- /dev/null
+++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/ExplicitNode.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.fluentjob.api.dag;
+
+import com.google.common.base.Preconditions;
+import org.apache.oozie.fluentjob.api.action.Node;
+import org.apache.oozie.fluentjob.api.Condition;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A class representing action nodes in an Oozie workflow definition DAG. These are the nodes in the intermediate graph
+ * representation that correspond to the nodes that are explicitly defined by the user.
+ */
+public class ExplicitNode extends NodeBase {
+    private NodeBase parent;
+    private NodeBase child;
+    private final Node realNode;
+
+    /**
+     * Create a new explicit node with the given name.
+     * @param name The name of the new explicit node.
+     * @param realNode The API level {@link Node} object defined by the end user.
+     */
+    public ExplicitNode(final String name, final Node realNode) {
+        super(name);
+        this.realNode = realNode;
+    }
+
+    /**
+     * Returns the API level {@link Node} object defined by the end user.
+     * @return The API level {@link Node} object defined by the end user.
+     */
+    public Node getRealNode() {
+        return realNode;
+    }
+
+    /**
+     * Returns the parent of this node.
+     * @return The parent of this node.
+     */
+    public NodeBase getParent() {
+        return parent;
+    }
+
+    /**
+     * Returns the child of this node.
+     * @return The child of this node.
+     */
+    public NodeBase getChild() {
+        return child;
+    }
+
+    /**
+     * Adds the provided node as a parent of this node.
+     * @param parent The new parent of this node.
+     * @throws IllegalStateException if this node already has a parent.
+     */
+    @Override
+    public void addParent(final NodeBase parent) {
+        Preconditions.checkState(this.parent == null, "An explicit node cannot have multiple parents.");
+
+        this.parent = parent;
+        parent.addChild(this);
+    }
+
+    /**
+     * Adds the provided node as a conditional parent of this node.
+     * @param parent The new conditional parent of this node.
+     * @param condition The condition which must be true in addition the parent completing successfully for this node
+     *                  to be executed.
+     * @throws IllegalStateException if this node already has a parent.
+     */
+    @Override
+    public void addParentWithCondition(final Decision parent, final Condition condition) {
+        Preconditions.checkState(this.parent == null, "An explicit node cannot have multiple parents.");
+
+        this.parent = parent;
+        parent.addChildWithCondition(this, condition);
+    }
+
+    /**
+     * Adds the provided node as the default conditional parent of this node.
+     * @param parent The new conditional parent of this node.
+     * @throws IllegalStateException if this node already has a parent.
+     */
+    @Override
+    public void addParentDefaultConditional(Decision parent) {
+        Preconditions.checkState(this.parent == null, "An explicit node cannot have multiple parents.");
+
+        this.parent = parent;
+        parent.addDefaultChild(this);
+    }
+
+    @Override
+    public void removeParent(final NodeBase parent) {
+        Preconditions.checkArgument(this.parent == parent, "Trying to remove a nonexistent parent.");
+
+        if (this.parent != null) {
+            this.parent.removeChild(this);
+        }
+
+        this.parent = null;
+    }
+
+    @Override
+    public void clearParents() {
+        removeParent(parent);
+    }
+
+    @Override
+    public List<NodeBase> getChildren() {
+        if (child == null) {
+            return Arrays.asList();
+        }
+
+        return Arrays.asList(child);
+    }
+
+    @Override
+    protected void addChild(final NodeBase child) {
+        Preconditions.checkState(this.child == null, "Normal nodes cannot have multiple children.");
+
+        this.child = child;
+    }
+
+    @Override
+    protected void removeChild(final NodeBase child) {
+        Preconditions.checkArgument(this.child == child, "Trying to remove a nonexistent child.");
+
+        this.child = null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Fork.java
----------------------------------------------------------------------
diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Fork.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Fork.java
new file mode 100644
index 0000000..2657e63
--- /dev/null
+++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Fork.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.fluentjob.api.dag;
+
+import com.google.common.base.Preconditions;
+import org.apache.oozie.fluentjob.api.Condition;
+import org.apache.oozie.fluentjob.api.ModifyOnce;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A class representing end nodes in an Oozie workflow definition DAG. These nodes are generated automatically,
+ * the end user should not need to use this class directly.
+ */
+public class Fork extends NodeBase {
+    private NodeBase parent;
+    private final List<NodeBase> children;
+
+    private final ModifyOnce<Join> closingJoin;
+
+    /**
+     * Create a new fork node with the given name.
+     * @param name The name of the new fork node.
+     */
+    public Fork(final String name) {
+        super(name);
+
+        this.parent = null;
+        this.children = new ArrayList<>();
+        this.closingJoin = new ModifyOnce<>();
+    }
+
+    /**
+     * Returns the parent of this node.
+     * @return The parent of this node.
+     */
+    public NodeBase getParent() {
+        return parent;
+    }
+
+    /**
+     * Adds the provided node as a parent of this node.
+     * @param parent The new parent of this node.
+     * @throws IllegalStateException if this node already has a parent.
+     */
+    @Override
+    public void addParent(final NodeBase parent) {
+        Preconditions.checkState(this.parent == null, "Fork nodes cannot have multiple parents.");
+
+        this.parent = parent;
+        parent.addChild(this);
+    }
+
+    /**
+     * Adds the provided node as a conditional parent of this node.
+     * @param parent The new conditional parent of this node.
+     * @param condition The condition which must be true in addition the parent completing successfully for this node
+     *                  to be executed.
+     * @throws IllegalStateException if this node already has a parent.
+     */
+    @Override
+    public void addParentWithCondition(final Decision parent, final Condition condition) {
+        Preconditions.checkState(this.parent == null, "Fork nodes cannot have multiple parents.");
+
+        this.parent = parent;
+        parent.addChildWithCondition(this, condition);
+    }
+
+    /**
+     * Adds the provided node as the default conditional parent of this node.
+     * @param parent The new conditional parent of this node.
+     * @throws IllegalStateException if this node already has a parent.
+     */
+    @Override
+    public void addParentDefaultConditional(Decision parent) {
+        Preconditions.checkState(this.parent == null, "Fork nodes cannot have multiple parents.");
+
+        this.parent = parent;
+        parent.addDefaultChild(this);
+    }
+
+    @Override
+    public void removeParent(final NodeBase parent) {
+        Preconditions.checkArgument(this.parent == parent, "Trying to remove a nonexistent parent.");
+
+        if (this.parent != null) {
+            this.parent.removeChild(this);
+        }
+
+        this.parent = null;
+    }
+
+    @Override
+    public void clearParents() {
+        removeParent(parent);
+    }
+
+    @Override
+    public List<NodeBase> getChildren() {
+        return Collections.unmodifiableList(new ArrayList<>(children));
+    }
+
+    Join getClosingJoin() {
+        return closingJoin.get();
+    }
+
+    boolean isClosed() {
+        return getClosingJoin() != null;
+    }
+
+    void close(final Join join) {
+        closingJoin.set(join);
+    }
+
+    @Override
+    protected void addChild(final NodeBase child) {
+        children.add(child);
+    }
+
+    @Override
+    protected void removeChild(final NodeBase child) {
+        Preconditions.checkArgument(this.children.remove(child),"Trying to remove a nonexistent child.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Graph.java
----------------------------------------------------------------------
diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Graph.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Graph.java
new file mode 100644
index 0000000..08cb896
--- /dev/null
+++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Graph.java
@@ -0,0 +1,872 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.fluentjob.api.dag;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.oozie.fluentjob.api.action.Node;
+import org.apache.oozie.fluentjob.api.workflow.Credentials;
+import org.apache.oozie.fluentjob.api.workflow.Global;
+import org.apache.oozie.fluentjob.api.workflow.Parameters;
+import org.apache.oozie.fluentjob.api.workflow.Workflow;
+import org.apache.oozie.fluentjob.api.Condition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The class holding the intermediate representation of the workflow. This is where the API level {@link Workflow}
+ * object is transformed to an intermediate graph (an object of this class), and all control nodes are generated.
+ * This graph is later further transformed to JAXB objects and to xml.
+ *
+ * The conversion from the API level {@link Workflow} object to the intermediate graph is as follows:
+ * We take the nodes in topological order, meaning every node is processed after all of its dependencies
+ * have been processed. There are two main possibilities when processing a node:
+ *      - the node has zero or one parent
+ *      - the node has at least two parents.
+ *
+ * In the first case, we simply add the converted node as a child to its parent (or the start node if there are none),
+ * possibly inserting a fork if the parent already has children, or using a pre-existing fork if a parent already
+ * has one.
+ *
+ * In the second case, we have to insert a join. We first check if we can join all incoming paths in a single join
+ * node or if we have to split them up because they come from multiple embedded forks. It is also possible that some
+ * incoming paths originate from the same fork but that fork has other outgoing paths as well. In that case we split
+ * the fork up into multiple embedded forks.
+ *
+ * After this, we examine all paths that we are going to join and look for side branches that lead out of the
+ * fork / join block, violating Oozie's constraints. If these are non-conditional branches, we simply cut them down
+ * from their original parents and put them under the new join (and possibly under a fork), and make them siblings
+ * of whatever nodes originally come after the join. This way all original dependencies are preserved, as the original
+ * parents will still be ancestors (though indirectly) to the relocated nodes, but new dependencies are introduced.
+ * This preserves the correctness of the workflow but decreases its parallelism. This is unfortunate but Oozie's graph
+ * format is more restrictive than a general DAG, so we have to accept it.
+ *
+ * If the side branches are conditional, we cut above the decision node and insert a join there. We reinsert the
+ * decision node under the new join. This is very similar to the handling of non-conditional paths, but it
+ * decreases parallelism even more (we cut one level higher).
+ * A problem occurs if two or more decision nodes come right after the fork that we want to close. If we cut above
+ * the decision nodes as usual we gain nothing, because we insert a join and a fork and arrive at the same situation
+ * as before - multiple decision nodes under a fork. Currently, we are not able to handle this situation and we throw
+ * an exception.
+ */
+public class Graph {
+    private final String name;
+    private final Start start = new Start("start");
+    private final End end = new End("end");
+    private final Parameters parameters;
+    private final Global global;
+    private final Credentials credentials;
+    private final Map<String, NodeBase> nodesByName = new LinkedHashMap<>();
+    private final Map<Fork, Integer> forkNumbers = new HashMap<>();
+    private int forkCounter = 1;
+
+    private final Map<NodeBase, Decision> originalParentToCorrespondingDecision = new HashMap<>();
+    private final Map<Decision, Integer> closedPathsOfDecisionNodes = new HashMap<>();
+    private int decisionCounter = 1;
+    private int decisionJoinCounter = 1;
+
+    /**
+     * Nodes that have a join downstream to them are closed, they should never get new children.
+     */
+    private final Map<NodeBase, Join> closingJoins = new HashMap<>();
+
+    /**
+     * Creates a new {@link Graph} object transforming the graph of the provided {@link Workflow} object
+     * into an intermediate level graph.
+     * @param workflow The {@link Workflow} object to transform.
+     */
+    public Graph(final Workflow workflow) {
+        this.name = workflow.getName();
+        this.parameters = workflow.getParameters();
+        this.global = workflow.getGlobal();
+        this.credentials = workflow.getCredentials();
+
+        final List<Node> nodesFromRootsToLeaves = getNodesFromRootsToLeaves(workflow);
+
+        storeNode(start);
+        storeNode(end);
+
+        convert(nodesFromRootsToLeaves);
+    }
+
+    /**
+     * Returns the name of this graph.
+     * @return The name of this graph.
+     */
+    public String getName() {
+        return name;
+    }
+
+    public Parameters getParameters() {
+        return parameters;
+    }
+
+    public Global getGlobal() {
+        return global;
+    }
+
+    /**
+     * Returns the start node of this graph.
+     * @return The start node of this graph.
+     */
+    public Start getStart() {
+        return start;
+    }
+
+    /**
+     * Returns the end node of this graph.
+     * @return The end node of this graph.
+     */
+    public End getEnd() {
+        return end;
+    }
+
+    /**
+     * Returns the node with the given name in this graph if it exists, {@code null} otherwise.
+     * @param name The name of the node that will be returned.
+     * @return The node with the given name in this graph if it exists, {@code null} otherwise.
+     */
+    public NodeBase getNodeByName(final String name) {
+        return nodesByName.get(name);
+    }
+
+    /**
+     * Returns a collection of the nodes in this graph.
+     * @return A collection of the nodes in this graph.
+     */
+    public Collection<NodeBase> getNodes() {
+        return nodesByName.values();
+    }
+
+    private void convert(final List<Node> nodesInTopologicalOrder) {
+        final Map<Node, NodeBase> nodeToNodeBase = new HashMap<>();
+
+        for (final Node originalNode : nodesInTopologicalOrder) {
+            final ExplicitNode convertedNode = new ExplicitNode(originalNode.getName(), originalNode);
+            nodeToNodeBase.put(originalNode, convertedNode);
+            storeNode(convertedNode);
+
+            checkAndInsertDecisionNode(originalNode, convertedNode);
+
+            final List<DagNodeWithCondition> mappedParentsWithConditions = findMappedParents(originalNode, nodeToNodeBase);
+
+            handleNodeWithParents(convertedNode, mappedParentsWithConditions);
+        }
+
+        final List<DagNodeWithCondition> finalNodes = findFinalNodes();
+
+        handleNodeWithParents(end, finalNodes);
+    }
+
+    private void checkAndInsertDecisionNode(Node originalNode, ExplicitNode convertedNode) {
+        if (!originalNode.getChildrenWithConditions().isEmpty()) {
+            // We insert a decision node below the current convertedNode.
+            final Decision decision = newDecision();
+            decision.addParent(convertedNode);
+            originalParentToCorrespondingDecision.put(convertedNode, decision);
+        }
+    }
+
+    private List<DagNodeWithCondition> findMappedParents(Node originalNode, Map<Node, NodeBase> nodeToNodeBase) {
+        final List<DagNodeWithCondition> mappedParentsWithConditions = new ArrayList<>();
+
+        for (final Node.NodeWithCondition parentNodeWithCondition : originalNode.getParentsWithConditions()) {
+            final NodeBase mappedParentNode = nodeToNodeBase.get(parentNodeWithCondition.getNode());
+            final Condition condition = parentNodeWithCondition.getCondition();
+            final DagNodeWithCondition parentNodeBaseWithCondition = new DagNodeWithCondition(mappedParentNode, condition);
+            mappedParentsWithConditions.add(parentNodeBaseWithCondition);
+        }
+
+        for (final Node parent : originalNode.getParentsWithoutConditions()) {
+            mappedParentsWithConditions.add(new DagNodeWithCondition(nodeToNodeBase.get(parent), null));
+        }
+
+        return mappedParentsWithConditions;
+    }
+
+    private List<DagNodeWithCondition> findFinalNodes() {
+        final List<DagNodeWithCondition> finalNodes = new ArrayList<>();
+
+        for (final NodeBase maybeFinalNode : nodesByName.values()) {
+            final boolean hasNoChildren = maybeFinalNode.getChildren().isEmpty();
+            final boolean isNotEnd = maybeFinalNode != end;
+            if (hasNoChildren && isNotEnd) {
+                finalNodes.add(new DagNodeWithCondition(maybeFinalNode, null));
+            }
+        }
+
+        return finalNodes;
+    }
+
+    private void storeNode(final NodeBase node) {
+        final String name = node.getName();
+
+        final boolean isPresent = nodesByName.containsKey(name);
+        if (isPresent) {
+            final String errorMessage = String.format("Duplicate name '%s' found in graph '%s'", node.getName(), this.getName());
+            throw new IllegalArgumentException(errorMessage);
+        }
+
+        nodesByName.put(node.getName(), node);
+    }
+
+    private NodeBase getNewParent(final NodeBase originalParent) {
+        NodeBase newParent = originalParent;
+
+        if (originalParentToCorrespondingDecision.containsKey(newParent)) {
+            newParent = originalParentToCorrespondingDecision.get(newParent);
+        }
+
+        newParent = getNearestNonClosedDescendant(newParent);
+
+        return newParent;
+    }
+
+    private void handleNodeWithParents(final NodeBase node, final List<DagNodeWithCondition> parentsWithConditions) {
+        // Avoiding adding children to nodes that are inside a closed fork / join pair and to original parents of decision nodes.
+        final List<DagNodeWithCondition> newParentsWithConditions = new ArrayList<>();
+        for (final DagNodeWithCondition parentWithCondition : parentsWithConditions) {
+            final NodeBase parent = parentWithCondition.getNode();
+            final Condition condition = parentWithCondition.getCondition();
+
+            final NodeBase newParent = getNewParent(parent);
+            final DagNodeWithCondition newParentWithCondition = new DagNodeWithCondition(newParent, condition);
+
+            if (!newParentsWithConditions.contains(newParentWithCondition)) {
+                newParentsWithConditions.add(newParentWithCondition);
+            }
+        }
+
+        if (newParentsWithConditions.isEmpty()) {
+            handleSingleParentNode(new DagNodeWithCondition(start, null), node);
+        }
+        else if (newParentsWithConditions.size() == 1) {
+            handleSingleParentNode(newParentsWithConditions.get(0), node);
+        }
+        else {
+            handleMultiParentNodeWithParents(node, newParentsWithConditions);
+        }
+    }
+
+    private void handleSingleParentNode(final DagNodeWithCondition parentWithCondition, final NodeBase node) {
+        addParentWithForkIfNeeded(node, parentWithCondition);
+    }
+
+    private void handleMultiParentNodeWithParents(final NodeBase node, final List<DagNodeWithCondition> parentsWithConditions) {
+        final List<PathInformation> paths = new ArrayList<>();
+        for (final DagNodeWithCondition parentWithCondition : parentsWithConditions) {
+            final NodeBase parent = parentWithCondition.getNode();
+            paths.add(getPathInfo(parent));
+        }
+
+        final BranchingToClose toClose = chooseBranchingToClose(paths);
+
+        // Eliminating redundant parents.
+        if (toClose.isRedundantParent()) {
+            final List<DagNodeWithCondition> parentsWithoutRedundant = new ArrayList<>(parentsWithConditions);
+            DagNodeWithCondition.removeFromCollection(parentsWithoutRedundant, toClose.getRedundantParent());
+
+            handleNodeWithParents(node, parentsWithoutRedundant);
+        }
+        else if (toClose.isDecision()) {
+            insertDecisionJoin(node, parentsWithConditions, toClose);
+        }
+        else {
+            insertJoin(parentsWithConditions, node, toClose);
+        }
+    }
+
+    private void insertDecisionJoin(final NodeBase node,
+                                    final List<DagNodeWithCondition> parentsWithConditions,
+                                    final BranchingToClose branchingToClose) {
+        final Decision decision = branchingToClose.getDecision();
+        final DecisionJoin decisionJoin = newDecisionJoin(decision, branchingToClose.getPaths().size());
+
+        for (final DagNodeWithCondition parentWithCondition : parentsWithConditions) {
+            addParentWithForkIfNeeded(decisionJoin, parentWithCondition);
+        }
+
+        addParentWithForkIfNeeded(node, new DagNodeWithCondition(decisionJoin, null));
+    }
+
+    private void insertJoin(final List<DagNodeWithCondition> parentsWithConditions,
+                            final NodeBase node,
+                            final BranchingToClose branchingToClose) {
+        if (branchingToClose.isSplittingJoinNeeded()) {
+            // We have to close a subset of the paths.
+            final List<DagNodeWithCondition> newParentsWithConditions = new ArrayList<>(parentsWithConditions);
+//            final List<NodeBase> parentsInToClose = new ArrayList<>();
+
+            for (final PathInformation path : branchingToClose.getPaths()) {
+//                parentsInToClose.add(path.getBottom());
+                DagNodeWithCondition.removeFromCollection(newParentsWithConditions, path.getBottom());
+            }
+
+            final Join newJoin = joinPaths(branchingToClose.getFork(), branchingToClose.getPaths());
+
+            newParentsWithConditions.add(new DagNodeWithCondition(newJoin, null));
+
+            handleMultiParentNodeWithParents(node, newParentsWithConditions);
+        }
+        else {
+            // There are no intermediary fork / join pairs to insert, we have to join all paths in a single join.
+            final Join newJoin = joinPaths(branchingToClose.getFork(), branchingToClose.getPaths());
+
+            if (newJoin != null) {
+                addParentWithForkIfNeeded(node, new DagNodeWithCondition(newJoin, null));
+            }
+            else {
+                // Null means a part of the paths was relocated because of a decision node.
+                handleNodeWithParents(node, parentsWithConditions);
+            }
+        }
+    }
+
+    // Returning null means we have relocated a part of the paths because of decision nodes, so the caller should try
+    // adding the node again.
+    private Join joinPaths(final Fork fork, final List<PathInformation> pathsToJoin) {
+        final Map<PathInformation, Decision> highestDecisionNodes = new LinkedHashMap<>();
+        for (final PathInformation path : pathsToJoin) {
+            for (int ixNodeOnPath = 0; ixNodeOnPath < path.getNodes().size(); ++ixNodeOnPath) {
+                final NodeBase nodeOnPath = path.getNodes().get(ixNodeOnPath);
+
+                if (nodeOnPath instanceof Decision) {
+                    // Excluding decision nodes where no branch goes out of this fork / join pair.
+                    if (!isDecisionClosed((Decision) nodeOnPath)) {
+                        highestDecisionNodes.put(path, (Decision) nodeOnPath);
+                    }
+                }
+                else if (nodeOnPath == fork) {
+                    break;
+                }
+            }
+        }
+
+        if (highestDecisionNodes.isEmpty()) {
+            return joinPathsWithoutDecisions(fork, pathsToJoin);
+        }
+        else {
+            return joinPathsWithDecisions(fork, pathsToJoin, highestDecisionNodes);
+        }
+    }
+
+    private Join joinPathsWithoutDecisions(final Fork fork, final List<PathInformation> pathsToJoin) {
+        final Set<NodeBase> mainBranchNodes = new LinkedHashSet<>();
+        for (final PathInformation pathInformation : pathsToJoin) {
+            mainBranchNodes.addAll(pathInformation.getNodes());
+        }
+
+        // Taking care of side branches.
+        final Set<NodeBase> closedNodes = new HashSet<>();
+        final List<NodeBase> sideBranches = new ArrayList<>();
+        for (final PathInformation path : pathsToJoin) {
+            for (int ixNodeOnPath = 0; ixNodeOnPath < path.getNodes().size(); ++ixNodeOnPath) {
+                final NodeBase nodeOnPath = path.getNodes().get(ixNodeOnPath);
+
+                if (nodeOnPath == fork) {
+                    break;
+                }
+
+                if (nodeOnPath instanceof Decision && isDecisionClosed((Decision) nodeOnPath)) {
+                    break;
+                }
+
+                sideBranches.addAll(cutSideBranches(nodeOnPath, mainBranchNodes));
+                closedNodes.add(nodeOnPath);
+            }
+        }
+
+        final Join newJoin;
+
+        // Check if we have to divide the fork.
+        final boolean hasMoreForkedChildren = pathsToJoin.size() < fork.getChildren().size();
+        if (hasMoreForkedChildren) {
+            // Dividing the fork.
+            newJoin = divideForkAndCloseSubFork(fork, pathsToJoin);
+        } else {
+            // We don't divide the fork.
+            newJoin = newJoin(fork);
+
+            for (final PathInformation path : pathsToJoin) {
+                addParentWithForkIfNeeded(newJoin, new DagNodeWithCondition(path.getBottom(), null));
+            }
+        }
+
+        // Inserting the side branches under the new join node.
+        for (final NodeBase sideBranch : sideBranches) {
+            addParentWithForkIfNeeded(sideBranch, new DagNodeWithCondition(newJoin, null));
+        }
+
+        // Marking the nodes as closed.
+        for (final NodeBase closedNode : closedNodes) {
+            markAsClosed(closedNode, newJoin);
+        }
+
+        return newJoin;
+    }
+
+    private Join joinPathsWithDecisions(final Fork fork,
+                                        final List<PathInformation> pathsToJoin,
+                                        final Map<PathInformation, Decision> highestDecisionNodes) {
+        final Set<Decision> decisions = new HashSet<>(highestDecisionNodes.values());
+
+        final List<PathInformation> newPaths = new ArrayList<>();
+        boolean shouldCloseJoinAndAddOtherDecisionsUnderIt = false;
+        for (final Decision decision : decisions) {
+            final NodeBase parentOfDecision = decision.getParent();
+
+            if (parentOfDecision == fork) {
+                shouldCloseJoinAndAddOtherDecisionsUnderIt = true;
+                break;
+            }
+
+            newPaths.add(getPathInfo(parentOfDecision));
+            removeParentWithForkIfNeeded(decision, decision.getParent());
+        }
+
+        if (shouldCloseJoinAndAddOtherDecisionsUnderIt) {
+            closeJoinAndAddOtherDecisionsUnderIt(fork, decisions);
+        }
+        else {
+            for (final PathInformation path : pathsToJoin) {
+                if (!highestDecisionNodes.containsKey(path)) {
+                    newPaths.add(path);
+                }
+            }
+
+            final Join newJoin = joinPaths(fork, newPaths);
+
+            for (final Decision decision : decisions) {
+                addParentWithForkIfNeeded(decision, new DagNodeWithCondition(newJoin, null));
+            }
+        }
+
+        return null;
+    }
+
+    private void closeJoinAndAddOtherDecisionsUnderIt(final Fork fork, final Set<Decision> decisions) {
+        // TODO: Either implement it correctly or give a more informative error message.
+        throw new IllegalStateException("Conditional paths originating ultimately from the same parallel branching (fork) " +
+                "do not converge to the same join.");
+    }
+
+    private void markAsClosed(final NodeBase node, final Join join) {
+        closingJoins.put(node, join);
+    }
+
+    private List<NodeBase> cutSideBranches(final NodeBase node,
+                                           final Set<NodeBase> mainBranchNodes) {
+        final List<NodeBase> sideBranches = new ArrayList<>();
+
+        // Closed forks cannot have side branches.
+        final boolean isClosedFork = node instanceof Fork && ((Fork) node).isClosed();
+        if (!isClosedFork) {
+            for (final NodeBase childOfForkOrParent : node.getChildren()) {
+                if (!mainBranchNodes.contains(childOfForkOrParent)) {
+                    removeParentWithForkIfNeeded(childOfForkOrParent, node);
+                    sideBranches.add(childOfForkOrParent);
+                }
+            }
+        }
+
+        return sideBranches;
+    }
+
+    private Join divideForkAndCloseSubFork(final Fork correspondingFork,
+                                           final List<PathInformation> paths) {
+        final Fork newFork = newFork();
+        for (final PathInformation path : paths) {
+            final int indexOfFork = path.getNodes().indexOf(correspondingFork);
+            final NodeBase childOfOriginalFork = path.getNodes().get(indexOfFork - 1);
+
+            childOfOriginalFork.removeParent(correspondingFork);
+            childOfOriginalFork.addParent(newFork);
+        }
+
+        newFork.addParent(correspondingFork);
+
+        final Join newJoin = newJoin(newFork);
+
+        for (final PathInformation path : paths) {
+            newJoin.addParent(path.getBottom());
+        }
+
+        return newJoin;
+    }
+
+    private BranchingToClose chooseBranchingToClose(final List<PathInformation> paths) {
+        int maxPathLength = 0;
+        for (final PathInformation pathInformation : paths) {
+            if (maxPathLength < pathInformation.getNodes().size()) {
+                maxPathLength = pathInformation.getNodes().size();
+            }
+        }
+
+        for (int ixLevel = 0; ixLevel < maxPathLength; ++ixLevel) {
+            final BranchingToClose foundAtThisLevel = chooseBranchingToClose(paths, ixLevel);
+
+            if (foundAtThisLevel != null) {
+                return foundAtThisLevel;
+            }
+        }
+
+        throw new IllegalStateException("We should never reach here.");
+    }
+
+    private BranchingToClose chooseBranchingToClose(final List<PathInformation> paths, final int ixLevel) {
+        for (final PathInformation path : paths) {
+            if (ixLevel < path.getNodes().size()) {
+                final NodeBase branching = path.getNodes().get(ixLevel);
+
+                final List<PathInformation> pathsMeetingAtCurrentFork = getPathsContainingNode(branching, paths);
+
+                if (pathsMeetingAtCurrentFork.size() > 1) {
+                    final boolean needToSplitJoin = pathsMeetingAtCurrentFork.size() < paths.size();
+
+                    // If branching is not a Fork or a Decision, then it is a redundant parent.
+                    if (branching instanceof Fork) {
+                        return BranchingToClose.withFork((Fork) branching, pathsMeetingAtCurrentFork, needToSplitJoin);
+                    } else if (branching instanceof Decision) {
+                        return BranchingToClose.withDecision((Decision) branching, pathsMeetingAtCurrentFork, needToSplitJoin);
+                    }
+                    else {
+                        return BranchingToClose.withRedundantParent(branching, pathsMeetingAtCurrentFork, needToSplitJoin);
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
+    private List<PathInformation> getPathsContainingNode(final NodeBase node, final List<PathInformation> paths) {
+        final List<PathInformation> pathsContainingNode = new ArrayList<>();
+
+        for (final PathInformation pathInformationMaybeContaining : paths) {
+            if (pathInformationMaybeContaining.getNodes().contains(node)) {
+                pathsContainingNode.add(pathInformationMaybeContaining);
+            }
+        }
+
+        return pathsContainingNode;
+    }
+
+    private PathInformation getPathInfo(final NodeBase node) {
+        NodeBase current = node;
+
+        final List<NodeBase> nodes = new ArrayList<>();
+
+        while (current != start) {
+            nodes.add(current);
+
+            if (current instanceof Join) {
+                // Get the fork pair of this join and go towards that
+                final Fork forkPair = ((Join) current).getBranchingPair();
+                current = forkPair;
+            }
+            else if (current instanceof DecisionJoin) {
+                final Decision decisionPair = ((DecisionJoin) current).getBranchingPair();
+                current = decisionPair;
+            }
+            else {
+                current = getSingleParent(current);
+            }
+        }
+
+        return new PathInformation(nodes);
+    }
+
+    private boolean isDecisionClosed(final Decision decision) {
+        final Integer closedPathsOfDecisionNode = closedPathsOfDecisionNodes.get(decision);
+        return closedPathsOfDecisionNode != null && decision.getChildren().size() == closedPathsOfDecisionNode;
+    }
+
+    private NodeBase getSingleParent(final NodeBase node) {
+        if (node instanceof End) {
+            return ((End) node).getParent();
+        }
+        else if (node instanceof Fork) {
+            return ((Fork) node).getParent();
+        }
+        else if (node instanceof Decision) {
+            return ((Decision) node).getParent();
+        }
+        else if (node instanceof ExplicitNode) {
+            return ((ExplicitNode) node).getParent();
+        }
+        else if (node instanceof Start) {
+            throw new IllegalStateException("Start nodes have no parent.");
+        }
+        else if (node instanceof Join) {
+            final Join join = (Join) node;
+            final int numberOfParents = join.getParents().size();
+            if (numberOfParents != 1) {
+                throw new IllegalStateException("The join node called '" + node.getName()
+                        + "' has " + numberOfParents + " parents instead of 1.");
+            }
+
+            return join.getParents().get(0);
+        }
+        else if (node == null) {
+            throw new IllegalArgumentException("Null node found.");
+        }
+
+        throw new IllegalArgumentException("Unknown node type.");
+    }
+
+    // Returns the first descendant that is not inside a closed fork / join pair.
+    private NodeBase getNearestNonClosedDescendant(final NodeBase node) {
+        NodeBase current = node;
+
+        while (closingJoins.containsKey(current)) {
+            current = closingJoins.get(current);
+        }
+
+        return current;
+    }
+
+    private void addParentWithForkIfNeeded(final NodeBase node, final DagNodeWithCondition parentWithCondition) {
+        final NodeBase parent = parentWithCondition.getNode();
+        final Condition condition = parentWithCondition.getCondition();
+        if (parent.getChildren().isEmpty() || parent instanceof Fork || parent instanceof Decision) {
+            if (condition != null) {
+                if (!(parent instanceof Decision)) {
+                    throw new IllegalStateException("Trying to add a conditional parent that is not a decision.");
+                }
+
+                node.addParentWithCondition((Decision) parent, condition);
+            }
+            else {
+                node.addParent(parent);
+            }
+        }
+        else {
+            // If there is no child, we never get to this point.
+            // There is only one child, otherwise it is a fork and we don't get here.
+            final NodeBase child = parent.getChildren().get(0);
+
+            if (child instanceof Fork) {
+                node.addParent(child);
+            }
+            else if (child instanceof Join) {
+                addParentWithForkIfNeeded(node, new DagNodeWithCondition(child, null));
+            }
+            else {
+                final Fork newFork = newFork();
+
+                child.removeParent(parent);
+                child.addParent(newFork);
+                node.addParent(newFork);
+                newFork.addParent(parent);
+            }
+        }
+    }
+
+    private void removeParentWithForkIfNeeded(final NodeBase node, final NodeBase parent) {
+        node.removeParent(parent);
+
+        final boolean isParentForkAndHasOneChild = parent instanceof Fork && parent.getChildren().size() == 1;
+        if (isParentForkAndHasOneChild) {
+            final NodeBase grandparent = ((Fork) parent).getParent();
+            final NodeBase child = parent.getChildren().get(0);
+
+            removeParentWithForkIfNeeded(parent, grandparent);
+            child.removeParent(parent);
+            child.addParent(grandparent);
+            nodesByName.remove(parent.getName());
+        }
+    }
+
+    private Fork newFork() {
+        final Fork fork = new Fork("fork" + forkCounter);
+
+        forkNumbers.put(fork, forkCounter);
+        forkCounter++;
+
+        storeNode(fork);
+
+        return fork;
+    }
+
+    private Join newJoin(final Fork correspondingFork) {
+        final Join join = new Join("join" + forkNumbers.get(correspondingFork), correspondingFork);
+
+        storeNode(join);
+
+        return join;
+    }
+
+    private Decision newDecision() {
+        final Decision decision = new Decision("decision" + decisionCounter);
+
+        decisionCounter++;
+
+        storeNode(decision);
+
+        return decision;
+    }
+
+    private DecisionJoin newDecisionJoin(final Decision correspondingDecision, final int numberOfPathsClosed) {
+        final DecisionJoin decisionJoin = new DecisionJoin("decisionJoin" + decisionJoinCounter, correspondingDecision);
+
+        final Integer numberOfAlreadyClosedChildren = closedPathsOfDecisionNodes.get(correspondingDecision);
+        final int newNumber = numberOfPathsClosed + (numberOfAlreadyClosedChildren == null ? 0 : numberOfAlreadyClosedChildren);
+
+        closedPathsOfDecisionNodes.put(correspondingDecision, newNumber);
+
+        decisionJoinCounter++;
+
+        storeNode(decisionJoin);
+
+        return decisionJoin;
+    }
+
+    private static List<Node> getNodesFromRootsToLeaves(final Workflow workflow) {
+        final List<Node> nodes = new ArrayList<>(workflow.getRoots());
+
+        for (int i = 0; i < nodes.size(); ++i) {
+            final Node current  = nodes.get(i);
+
+            for (final Node child : current.getAllChildren()) {
+                // Checking if every dependency has been processed, if not, we do not add the start to the list.
+                final List<Node> dependencies = child.getAllParents();
+                if (nodes.containsAll(dependencies) && !nodes.contains(child)) {
+                    nodes.add(child);
+                }
+            }
+        }
+
+        return nodes;
+    }
+
+    public Credentials getCredentials() {
+        return credentials;
+    }
+
+    private static class PathInformation {
+        private final ImmutableList<NodeBase> nodes;
+
+        PathInformation(final List<NodeBase> nodes) {
+            this.nodes = new ImmutableList.Builder<NodeBase>().addAll(nodes).build();
+        }
+
+        NodeBase getBottom() {
+            return nodes.get(0);
+        }
+
+        public List<NodeBase> getNodes() {
+            return nodes;
+        }
+
+    }
+
+    private static class BranchingToClose {
+        private final Fork fork;
+        private final Decision decision;
+        private final NodeBase redundantParent;
+        private final ImmutableList<PathInformation> paths;
+        private final boolean needToSplitJoin;
+
+        static BranchingToClose withFork(final Fork fork,
+                                         final List<PathInformation> paths,
+                                         final boolean needToSplitJoin) {
+            return new BranchingToClose(fork, null, null, paths, needToSplitJoin);
+        }
+
+        static BranchingToClose withDecision(final Decision decision,
+                                             final List<PathInformation> paths,
+                                             final boolean needToSplitJoin) {
+            return new BranchingToClose(null, decision, null, paths, needToSplitJoin);
+        }
+
+        static BranchingToClose withRedundantParent(final NodeBase redundantParent,
+                                                    final List<PathInformation> paths,
+                                                    final boolean needToSplitJoin) {
+            return new BranchingToClose(null, null, redundantParent, paths, needToSplitJoin);
+        }
+
+        private BranchingToClose(final Fork fork,
+                                 final Decision decision,
+                                 final NodeBase redundantParent,
+                                 final List<PathInformation> paths,
+                                 final boolean needToSplitJoin) {
+            checkOnlyOneIsNotNull(fork, decision, redundantParent);
+
+            this.fork = fork;
+            this.decision = decision;
+            this.redundantParent = redundantParent;
+            this.paths = ImmutableList.copyOf(paths);
+            this.needToSplitJoin = needToSplitJoin;
+        }
+
+        public Fork getFork() {
+            return fork;
+        }
+
+        public Decision getDecision() {
+            return decision;
+        }
+
+        NodeBase getRedundantParent() {
+            return redundantParent;
+        }
+
+        List<PathInformation> getPaths() {
+            return paths;
+        }
+
+        boolean isDecision() {
+            return decision != null;
+        }
+
+        boolean isRedundantParent() {
+            return redundantParent != null;
+        }
+
+        boolean isSplittingJoinNeeded() {
+            return needToSplitJoin;
+        }
+
+        private void checkOnlyOneIsNotNull(final Fork fork, final Decision decision, final NodeBase redundantParent) {
+            int counter = 0;
+
+            if (fork != null) {
+                ++counter;
+            }
+
+            if (decision != null) {
+                ++counter;
+            }
+
+            if (redundantParent != null) {
+                ++counter;
+            }
+
+            Preconditions.checkArgument(counter == 1, "Exactly one of 'fork' and 'redundantParent' must be non-null.");
+        }
+    }
+}