You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:26:24 UTC
[38/51] [partial] falcon git commit: FALCON-1830 Removed code source
directories and updated pom
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
deleted file mode 100644
index e4d9385..0000000
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.v0;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Cluster;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.service.ConfigurationChangeListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * An in-memory graph of entities and relationship among themselves.
- */
-public final class EntityGraph implements ConfigurationChangeListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(EntityGraph.class);
-
- private static EntityGraph instance = new EntityGraph();
-
- private Map<Node, Set<Node>> graph = new ConcurrentHashMap<Node, Set<Node>>();
-
- private EntityGraph() {
- }
-
- public static EntityGraph get() {
- return instance;
- }
-
- public Set<Entity> getDependents(Entity entity) throws FalconException {
- Node entityNode = new Node(entity.getEntityType(), entity.getName());
- if (graph.containsKey(entityNode)) {
- ConfigurationStore store = ConfigurationStore.get();
- Set<Entity> dependents = new HashSet<Entity>();
- for (Node node : graph.get(entityNode)) {
- Entity dependentEntity = store.get(node.type, node.name);
- if (dependentEntity != null) {
- dependents.add(dependentEntity);
- } else {
- LOG.error("Dependent entity {} was not found in configuration store.", node);
- }
- }
- return dependents;
- } else {
- return null;
- }
- }
-
- @Override
- public void onAdd(Entity entity) throws FalconException {
- Map<Node, Set<Node>> nodeEdges = null;
- switch (entity.getEntityType()) {
- case PROCESS:
- nodeEdges = getEdgesFor((Process) entity);
- break;
- case FEED:
- nodeEdges = getEdgesFor((Feed) entity);
- break;
- default:
- }
- if (nodeEdges == null) {
- return;
- }
- LOG.debug("Adding edges for {}: {}", entity.getName(), nodeEdges);
-
- for (Map.Entry<Node, Set<Node>> entry : nodeEdges.entrySet()) {
- LOG.debug("Adding edges : {} for {}", entry.getValue(), entry.getKey());
- if (graph.containsKey(entry.getKey())) {
- graph.get(entry.getKey()).addAll(entry.getValue());
- } else {
- graph.put(entry.getKey(), entry.getValue());
- }
- }
- LOG.debug("Merged edges to graph {}", entity.getName());
- }
-
- @Override
- public void onRemove(Entity entity) throws FalconException {
- Map<Node, Set<Node>> nodeEdges = null;
- switch (entity.getEntityType()) {
- case PROCESS:
- nodeEdges = getEdgesFor((Process) entity);
- break;
- case FEED:
- nodeEdges = getEdgesFor((Feed) entity);
- break;
- default:
- }
- if (nodeEdges == null) {
- return;
- }
-
- for (Map.Entry<Node, Set<Node>> entry : nodeEdges.entrySet()) {
- if (graph.containsKey(entry.getKey())) {
- graph.get(entry.getKey()).removeAll(entry.getValue());
- if (graph.get(entry.getKey()).isEmpty()) {
- graph.remove(entry.getKey());
- }
- }
- }
- }
-
- @Override
- public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
- onRemove(oldEntity);
- onAdd(newEntity);
- }
-
- @Override
- public void onReload(Entity entity) throws FalconException {
- onAdd(entity);
- }
-
- private Map<Node, Set<Node>> getEdgesFor(Process process) {
- Map<Node, Set<Node>> nodeEdges = new HashMap<Node, Set<Node>>();
- Node processNode = new Node(EntityType.PROCESS, process.getName());
- nodeEdges.put(processNode, new HashSet<Node>());
- Set<Node> processEdges = nodeEdges.get(processNode);
- if (process.getInputs() != null) {
- for (Input input : process.getInputs().getInputs()) {
- Node feedNode = new Node(EntityType.FEED, input.getFeed());
- if (!nodeEdges.containsKey(feedNode)) {
- nodeEdges.put(feedNode, new HashSet<Node>());
- }
- Set<Node> feedEdges = nodeEdges.get(feedNode);
- processEdges.add(feedNode);
- feedEdges.add(processNode);
- }
- }
- if (process.getOutputs() != null) {
- for (Output output : process.getOutputs().getOutputs()) {
- Node feedNode = new Node(EntityType.FEED, output.getFeed());
- if (!nodeEdges.containsKey(feedNode)) {
- nodeEdges.put(feedNode, new HashSet<Node>());
- }
- Set<Node> feedEdges = nodeEdges.get(feedNode);
- processEdges.add(feedNode);
- feedEdges.add(processNode);
- }
- }
-
- for (Cluster cluster : process.getClusters().getClusters()) {
- Node clusterNode = new Node(EntityType.CLUSTER, cluster.getName());
- processEdges.add(clusterNode);
- nodeEdges.put(clusterNode, new HashSet<Node>());
- nodeEdges.get(clusterNode).add(processNode);
- }
-
- return nodeEdges;
- }
-
- private Map<Node, Set<Node>> getEdgesFor(Feed feed) {
- Map<Node, Set<Node>> nodeEdges = new HashMap<Node, Set<Node>>();
- Node feedNode = new Node(EntityType.FEED, feed.getName());
- Set<Node> feedEdges = new HashSet<Node>();
- nodeEdges.put(feedNode, feedEdges);
-
- for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
- Node clusterNode = new Node(EntityType.CLUSTER, cluster.getName());
- if (!nodeEdges.containsKey(clusterNode)) {
- nodeEdges.put(clusterNode, new HashSet<Node>());
- }
- Set<Node> clusterEdges = nodeEdges.get(clusterNode);
- feedEdges.add(clusterNode);
- clusterEdges.add(feedNode);
-
- if (FeedHelper.isImportEnabled(cluster)) {
- Node dbNode = new Node(EntityType.DATASOURCE, FeedHelper.getImportDatasourceName(cluster));
- if (!nodeEdges.containsKey(dbNode)) {
- nodeEdges.put(dbNode, new HashSet<Node>());
- }
- Set<Node> dbEdges = nodeEdges.get(dbNode);
- feedEdges.add(dbNode);
- dbEdges.add(feedNode);
- }
- }
- return nodeEdges;
- }
-
- /**
- * Node element in the graph.
- */
- private static final class Node {
-
- private final EntityType type;
- private final String name;
-
- private Node(EntityType type, String name) {
- this.type = type;
- this.name = name;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- Node node = (Node) o;
-
- boolean nameEqual = name != null ? !name.equals(node.name) : node.name != null;
-
- if (nameEqual) {
- return false;
- }
- if (type != node.type) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = type != null ? type.hashCode() : 0;
- result = 31 * result + (name != null ? name.hashCode() : 0);
- return result;
- }
-
- @Override
- public String toString() {
- return "(" + type + ") " + name;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
deleted file mode 100644
index 4c7e913..0000000
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.v0;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Pair;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Helper methods to check integrity of entity.
- */
-public final class EntityIntegrityChecker {
-
- private EntityIntegrityChecker() {}
-
- public static Pair<String, EntityType>[] referencedBy(Entity entity) throws FalconException {
- Set<Entity> deps = EntityGraph.get().getDependents(entity);
- if (deps == null) {
- return null;
- }
-
- switch (entity.getEntityType()) {
- case CLUSTER:
- return filter(deps, EntityType.FEED, EntityType.PROCESS);
-
- case FEED:
- return filter(deps, EntityType.PROCESS);
-
- case DATASOURCE:
- return filter(deps, EntityType.FEED);
-
- default:
- return null;
- }
- }
-
- @SuppressWarnings("unchecked")
- private static Pair<String, EntityType>[] filter(Set<Entity> deps, EntityType... types) {
- List<Pair<String, EntityType>> filteredSet = new ArrayList<Pair<String, EntityType>>();
- List<EntityType> validTypes = Arrays.asList(types);
- for (Entity dep : deps) {
- if (validTypes.contains(dep.getEntityType())) {
- filteredSet.add(Pair.of(dep.getName(), dep.getEntityType()));
- }
- }
- return filteredSet.toArray(new Pair[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java b/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java
deleted file mode 100644
index cad196b..0000000
--- a/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.v0;
-
-import org.apache.falcon.FalconException;
-
-/**
- * This exception is thrown when Unschedulable entity
- * like CLUSTER is tried with actions like Schedule, Suspend,
- * Resume.
- */
-public class UnschedulableEntityException extends FalconException {
-
- private static final long serialVersionUID = -1134342662497698943L;
-
- public UnschedulableEntityException(Exception e) {
- super(e);
- }
-
- public UnschedulableEntityException(String message, Exception e) {
- super(message, e);
- }
-
- public UnschedulableEntityException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
deleted file mode 100644
index 65aaeba..0000000
--- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.expression;
-
-import org.apache.commons.el.ExpressionEvaluatorImpl;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.common.FeedDataPath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.jsp.el.ELException;
-import javax.servlet.jsp.el.ExpressionEvaluator;
-import javax.servlet.jsp.el.FunctionMapper;
-import javax.servlet.jsp.el.VariableResolver;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Properties;
-import java.util.TimeZone;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Helper for evaluating expressions.
- */
-public final class ExpressionHelper implements FunctionMapper, VariableResolver {
-
- private static final Logger LOG = LoggerFactory.getLogger(ExpressionHelper.class);
- private static final ExpressionHelper INSTANCE = new ExpressionHelper();
-
- private static final ThreadLocal<Properties> THREAD_VARIABLES = new ThreadLocal<Properties>();
-
- private static final Pattern SYS_PROPERTY_PATTERN = Pattern.compile("\\$\\{[A-Za-z0-9_.]+\\}");
-
- private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
- private static final ExpressionHelper RESOLVER = ExpressionHelper.get();
-
- public static final ThreadLocal<SimpleDateFormat> FORMATTER = new ThreadLocal<SimpleDateFormat>() {
- @Override
- protected SimpleDateFormat initialValue() {
- SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'");
- format.setTimeZone(TimeZone.getTimeZone("UTC"));
- return format;
- }
- };
-
- public static ExpressionHelper get() {
- return INSTANCE;
- }
-
- private enum DayOfWeek {
- SUN, MON, TUE, WED, THU, FRI, SAT
- }
-
- private ExpressionHelper() {
- }
-
- public <T> T evaluate(String expression, Class<T> clazz) throws FalconException {
- return evaluateFullExpression("${" + expression + "}", clazz);
- }
-
- @SuppressWarnings("unchecked")
- public <T> T evaluateFullExpression(String expression, Class<T> clazz) throws FalconException {
- try {
- return (T) EVALUATOR.evaluate(expression, clazz, RESOLVER, RESOLVER);
- } catch (ELException e) {
- throw new FalconException("Unable to evaluate " + expression, e);
- }
- }
-
- @Override
- public Method resolveFunction(String prefix, String name) {
- for (Method method : ExpressionHelper.class.getDeclaredMethods()) {
- if (method.getName().equals(name)) {
- return method;
- }
- }
- throw new UnsupportedOperationException("Not found " + prefix + ":" + name);
- }
-
- public void setPropertiesForVariable(Properties properties) {
- THREAD_VARIABLES.set(properties);
- }
-
- @Override
- public Object resolveVariable(String field) {
- return THREAD_VARIABLES.get().get(field);
- }
-
- private static ThreadLocal<Date> referenceDate = new ThreadLocal<Date>();
-
- public static void setReferenceDate(Date date) {
- referenceDate.set(date);
- Properties variables = getTimeVariables(date, TimeZone.getTimeZone("UTC"));
- THREAD_VARIABLES.set(variables);
- }
-
- public static Properties getTimeVariables(Date date, TimeZone tz) {
- Properties vars = new Properties();
- Calendar cal = Calendar.getInstance(tz);
- cal.setTime(date);
- vars.put(FeedDataPath.VARS.YEAR.name(), String.format("%04d", cal.get(Calendar.YEAR)));
- vars.put(FeedDataPath.VARS.MONTH.name(), String.format("%02d", (cal.get(Calendar.MONTH) + 1)));
- vars.put(FeedDataPath.VARS.DAY.name(), String.format("%02d", cal.get(Calendar.DAY_OF_MONTH)));
- vars.put(FeedDataPath.VARS.HOUR.name(), String.format("%02d", cal.get(Calendar.HOUR_OF_DAY)));
- vars.put(FeedDataPath.VARS.MINUTE.name(), String.format("%02d", cal.get(Calendar.MINUTE)));
- return vars;
- }
-
- private static int getDayOffset(String weekDayName) {
- int day;
- Calendar nominalTime = Calendar.getInstance();
- nominalTime.setTimeZone(TimeZone.getTimeZone("UTC"));
- nominalTime.setTime(referenceDate.get());
- int currentWeekDay = nominalTime.get(Calendar.DAY_OF_WEEK);
- int weekDay = DayOfWeek.valueOf(weekDayName).ordinal() + 1; //to map to Calendar.SUNDAY ...
- day = weekDay - currentWeekDay;
- if (weekDay > currentWeekDay) {
- day = day - 7;
- }
- return day;
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SF_SWITCH_FALLTHROUGH"})
- private static Date getRelative(Date date, int boundary, int month, int day, int hour, int minute) {
- Calendar dsInstanceCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- dsInstanceCal.setTime(date);
- switch (boundary) {
- case Calendar.YEAR:
- dsInstanceCal.set(Calendar.MONTH, 0);
- case Calendar.MONTH:
- dsInstanceCal.set(Calendar.DAY_OF_MONTH, 1);
- case Calendar.DAY_OF_MONTH:
- dsInstanceCal.set(Calendar.HOUR_OF_DAY, 0);
- case Calendar.HOUR:
- dsInstanceCal.set(Calendar.MINUTE, 0);
- dsInstanceCal.set(Calendar.SECOND, 0);
- dsInstanceCal.set(Calendar.MILLISECOND, 0);
- break;
- case Calendar.SECOND:
- break;
- default:
- throw new IllegalArgumentException("Invalid boundary " + boundary);
- }
-
- dsInstanceCal.add(Calendar.YEAR, 0);
- dsInstanceCal.add(Calendar.MONTH, month);
- dsInstanceCal.add(Calendar.DAY_OF_MONTH, day);
- dsInstanceCal.add(Calendar.HOUR_OF_DAY, hour);
- dsInstanceCal.add(Calendar.MINUTE, minute);
- return dsInstanceCal.getTime();
- }
-
- public static Date now(int hour, int minute) {
- return getRelative(referenceDate.get(), Calendar.SECOND, 0, 0, hour, minute);
- }
-
- public static Date today(int hour, int minute) {
- return getRelative(referenceDate.get(), Calendar.DAY_OF_MONTH, 0, 0, hour, minute);
- }
-
- public static Date yesterday(int hour, int minute) {
- return getRelative(referenceDate.get(), Calendar.DAY_OF_MONTH, 0, -1, hour, minute);
- }
-
- public static Date currentMonth(int day, int hour, int minute) {
- return getRelative(referenceDate.get(), Calendar.MONTH, 0, day, hour, minute);
- }
-
- public static Date lastMonth(int day, int hour, int minute) {
- return getRelative(referenceDate.get(), Calendar.MONTH, -1, day, hour, minute);
- }
-
- public static Date currentWeek(String weekDay, int hour, int minute) {
- int day = getDayOffset(weekDay);
- return getRelative(referenceDate.get(), Calendar.DAY_OF_MONTH, 0, day, hour, minute);
- }
-
- public static Date lastWeek(String weekDay, int hour, int minute) {
- int day = getDayOffset(weekDay);
- return getRelative(referenceDate.get(), Calendar.DAY_OF_MONTH, 0, day - 7, hour, minute);
- }
-
- public static Date currentYear(int month, int day, int hour, int minute) {
- return getRelative(referenceDate.get(), Calendar.YEAR, month, day, hour, minute);
- }
-
- public static Date lastYear(int month, int day, int hour, int minute) {
- return getRelative(referenceDate.get(), Calendar.YEAR, month - 12, day, hour, minute);
- }
-
- public static Date latest(int n) {
- //by pass Falcon validations
- return referenceDate.get();
- }
-
- public static Date future(int n, int limit) {
- //by pass Falcon validations
- return referenceDate.get();
- }
-
- public static long hours(int val) {
- return TimeUnit.HOURS.toMillis(val);
- }
-
- public static long minutes(int val) {
- return TimeUnit.MINUTES.toMillis(val);
- }
-
- public static long days(int val) {
- return TimeUnit.DAYS.toMillis(val);
- }
-
- public static long months(int val) {
- return val * days(31);
- }
-
- public static long years(int val) {
- return val * days(366);
- }
-
- public static String substitute(String originalValue) {
- return substitute(originalValue, System.getProperties());
- }
-
- public static String substitute(String originalValue, Properties properties) {
- Matcher envVarMatcher = SYS_PROPERTY_PATTERN.matcher(originalValue);
- while (envVarMatcher.find()) {
- String envVar = originalValue.substring(envVarMatcher.start() + 2,
- envVarMatcher.end() - 1);
- String envVal = properties.getProperty(envVar, System.getenv(envVar));
-
- envVar = "\\$\\{" + envVar + "\\}";
- if (envVal != null) {
- originalValue = originalValue.replaceAll(envVar, Matcher.quoteReplacement(envVal));
- envVarMatcher = SYS_PROPERTY_PATTERN.matcher(originalValue);
- }
- }
- return originalValue;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/group/FeedGroup.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
deleted file mode 100644
index d288925..0000000
--- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.group;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.common.FeedDataPath;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.LocationType;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
-
-/**
- * Group, which represents a logical group of feeds which can belong to this
- * group.
- */
-public class FeedGroup {
-
- public FeedGroup(String group, Frequency frequency, String path) {
- this.name = group;
- this.frequency = frequency;
- this.datePattern = getDatePattern(path);
- this.feeds = Collections
- .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
- }
-
- public static String getDatePattern(String path) {
- Matcher matcher = FeedDataPath.PATTERN.matcher(path);
- List<String> fields = new ArrayList<String>();
- while (matcher.find()) {
- String var = path.substring(matcher.start(), matcher.end());
- fields.add(var);
- }
- Collections.sort(fields);
- return fields.toString();
- }
-
- private String name;
- private Frequency frequency;
- private String datePattern;
- private Set<String> feeds;
-
- public Set<String> getFeeds() {
- return feeds;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null || !(obj instanceof FeedGroup)) {
- return false;
- }
- FeedGroup group = (FeedGroup) obj;
- return (this.name.equals(group.getName())
- && this.frequency.equals(group.frequency)
- && this.datePattern
- .equals(group.datePattern));
-
- }
-
- @Override
- public int hashCode() {
- return 127 * name.hashCode() + 31 * frequency.hashCode() + datePattern.hashCode();
- }
-
- public String getName() {
- return name;
- }
-
- public Frequency getFrequency() {
- return frequency;
- }
-
- public String getDatePattern() {
- return datePattern;
- }
-
- public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
- return this.frequency.equals(feed.getFrequency())
- && this.datePattern.equals(getDatePattern(
- FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA)));
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
deleted file mode 100644
index a832366..0000000
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.group;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.service.ConfigurationChangeListener;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Has 2 way mappings from feed to group and group to feed.
- */
-public final class FeedGroupMap implements ConfigurationChangeListener {
-
- private static final FeedGroupMap INSTANCE = new FeedGroupMap();
- private Map<String, FeedGroup> groupsMapping = new ConcurrentHashMap<String, FeedGroup>();
-
- private FeedGroupMap() {
- // singleton
- }
-
- public static FeedGroupMap get() {
- return INSTANCE;
- }
-
- public Map<String, FeedGroup> getGroupsMapping() {
- return Collections.unmodifiableMap(groupsMapping);
- }
-
- @Override
- public void onAdd(Entity entity) throws FalconException {
-
- if (entity.getEntityType().equals(EntityType.FEED)) {
- Feed feed = (Feed) entity;
- if (feed.getGroups() == null || feed.getGroups().equals("")) {
- return;
- }
- Set<FeedGroup> groupSet = getGroups(feed);
- addGroups(feed.getName(), groupSet);
- }
- }
-
- @Override
- public void onRemove(Entity entity) throws FalconException {
- if (entity.getEntityType().equals(EntityType.FEED)) {
- Feed feed = (Feed) entity;
- if (StringUtils.isEmpty(feed.getGroups())) {
- return;
- }
- String[] groups = feed.getGroups().split(",");
- for (String group : groups) {
- groupsMapping.get(group).getFeeds().remove(entity.getName());
- if (groupsMapping.get(group).getFeeds().size() == 0) {
- groupsMapping.remove(group);
- }
- }
-
- }
- }
-
- @Override
- public void onChange(Entity oldEntity, Entity newEntity)
- throws FalconException {
-
- onRemove(oldEntity);
- onAdd(newEntity);
- }
-
- @Override
- public void onReload(Entity entity) throws FalconException {
- onAdd(entity);
- }
-
- private void addGroups(String feed, Set<FeedGroup> groups) {
- for (FeedGroup group : groups) {
- if (groupsMapping.containsKey(group.getName())) {
- groupsMapping.get(group.getName()).getFeeds().add(feed);
- } else {
- group.getFeeds().add(feed);
- groupsMapping.put(group.getName(), group);
- }
- }
- }
-
- public Set<FeedGroup> getGroups(String groups, Frequency frequency, String path) {
- Set<FeedGroup> groupSet = new HashSet<FeedGroup>();
- String[] groupArray = groups.split(",");
- for (String group : groupArray) {
- groupSet.add(new FeedGroup(group, frequency, path));
- }
- return groupSet;
- }
-
- public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
- return getGroups(feed.getGroups(), feed.getFrequency(),
- FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
deleted file mode 100644
index e33d353..0000000
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hadoop;
-
-import org.apache.commons.lang.Validate;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.security.SecurityUtil;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.PrivilegedExceptionAction;
-
-/**
- * A factory implementation to dole out FileSystem handles based on the logged in user.
- */
-public final class HadoopClientFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(HadoopClientFactory.class);
-
- public static final String FS_DEFAULT_NAME_KEY = CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
- public static final String MR_JT_ADDRESS_KEY = "mapreduce.jobtracker.address";
- public static final String YARN_RM_ADDRESS_KEY = "yarn.resourcemanager.address";
-
- public static final FsPermission READ_EXECUTE_PERMISSION =
- new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE);
- public static final FsPermission ALL_PERMISSION =
- new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-
- private static final HadoopClientFactory INSTANCE = new HadoopClientFactory();
-
- private HadoopClientFactory() {
- }
-
- public static HadoopClientFactory get() {
- return INSTANCE;
- }
-
- /**
- * This method is only used by Falcon internally to talk to the config store on HDFS.
- *
- * @param uri file system URI for config store.
- * @return FileSystem created with the provided proxyUser/group.
- * @throws org.apache.falcon.FalconException
- * if the filesystem could not be created.
- */
- public FileSystem createFalconFileSystem(final URI uri) throws FalconException {
- Validate.notNull(uri, "uri cannot be null");
-
- try {
- Configuration conf = new Configuration();
- if (UserGroupInformation.isSecurityEnabled()) {
- conf.set(SecurityUtil.NN_PRINCIPAL, StartupProperties.get().getProperty(SecurityUtil.NN_PRINCIPAL));
- }
-
- return createFileSystem(UserGroupInformation.getLoginUser(), uri, conf);
- } catch (IOException e) {
- throw new FalconException("Exception while getting FileSystem for: " + uri, e);
- }
- }
-
- /**
- * This method is only used by Falcon internally to talk to the config store on HDFS.
- *
- * @param conf configuration.
- * @return FileSystem created with the provided proxyUser/group.
- * @throws org.apache.falcon.FalconException
- * if the filesystem could not be created.
- */
- public FileSystem createFalconFileSystem(final Configuration conf)
- throws FalconException {
- Validate.notNull(conf, "configuration cannot be null");
-
- String nameNode = getNameNode(conf);
- try {
- return createFileSystem(UserGroupInformation.getLoginUser(), new URI(nameNode), conf);
- } catch (URISyntaxException e) {
- throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
- } catch (IOException e) {
- throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
- }
- }
-
- /**
- * Return a FileSystem created with the authenticated proxy user for the specified conf.
- *
- * @param conf Configuration with all necessary information to create the FileSystem.
- * @return FileSystem created with the provided proxyUser/group.
- * @throws org.apache.falcon.FalconException
- * if the filesystem could not be created.
- */
- public FileSystem createProxiedFileSystem(final Configuration conf)
- throws FalconException {
- Validate.notNull(conf, "configuration cannot be null");
-
- String nameNode = getNameNode(conf);
- try {
- return createProxiedFileSystem(new URI(nameNode), conf);
- } catch (URISyntaxException e) {
- throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
- }
- }
-
- private static String getNameNode(Configuration conf) {
- return conf.get(FS_DEFAULT_NAME_KEY);
- }
-
- /**
- * This method is called from with in a workflow execution context.
- *
- * @param uri uri
- * @return file system handle
- * @throws FalconException
- */
- public FileSystem createProxiedFileSystem(final URI uri) throws FalconException {
- return createProxiedFileSystem(uri, new Configuration());
- }
-
- public FileSystem createProxiedFileSystem(final URI uri,
- final Configuration conf) throws FalconException {
- Validate.notNull(uri, "uri cannot be null");
-
- try {
- return createFileSystem(CurrentUser.getProxyUGI(), uri, conf);
- } catch (IOException e) {
- throw new FalconException("Exception while getting FileSystem for proxy: "
- + CurrentUser.getUser(), e);
- }
- }
-
- /**
- * Return a FileSystem created with the provided user for the specified URI.
- *
- * @param ugi user group information
- * @param uri file system URI.
- * @param conf Configuration with all necessary information to create the FileSystem.
- * @return FileSystem created with the provided user/group.
- * @throws org.apache.falcon.FalconException
- * if the filesystem could not be created.
- */
- @SuppressWarnings("ResultOfMethodCallIgnored")
- public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri,
- final Configuration conf) throws FalconException {
- Validate.notNull(ugi, "ugi cannot be null");
- Validate.notNull(conf, "configuration cannot be null");
-
- try {
- if (UserGroupInformation.isSecurityEnabled()) {
- ugi.checkTGTAndReloginFromKeytab();
- }
- } catch (IOException ioe) {
- throw new FalconException("Exception while getting FileSystem. Unable to check TGT for user "
- + ugi.getShortUserName(), ioe);
- }
-
- validateNameNode(uri, conf);
-
- try {
- // prevent falcon impersonating falcon, no need to use doas
- final String proxyUserName = ugi.getShortUserName();
- if (proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
- LOG.info("Creating FS for the login user {}, impersonation not required",
- proxyUserName);
- return FileSystem.get(uri, conf);
- }
-
- LOG.info("Creating FS impersonating user {}", proxyUserName);
- return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
- public FileSystem run() throws Exception {
- return FileSystem.get(uri, conf);
- }
- });
- } catch (InterruptedException ex) {
- throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex);
- } catch (IOException ex) {
- throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex);
- }
- }
-
- /**
- * This method validates if the execute url is able to reach the MR endpoint.
- *
- * @param executeUrl jt url or RM url
- * @throws IOException
- */
- public void validateJobClient(String executeUrl) throws IOException {
- final JobConf jobConf = new JobConf();
- jobConf.set(MR_JT_ADDRESS_KEY, executeUrl);
- jobConf.set(YARN_RM_ADDRESS_KEY, executeUrl);
-
- UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
- try {
- JobClient jobClient = loginUser.doAs(new PrivilegedExceptionAction<JobClient>() {
- public JobClient run() throws Exception {
- return new JobClient(jobConf);
- }
- });
-
- jobClient.getClusterStatus().getMapTasks();
- } catch (InterruptedException e) {
- throw new IOException("Exception creating job client:" + e.getMessage(), e);
- }
- }
-
- public static FsPermission getDirDefaultPermission(Configuration conf) {
- return getDirDefault().applyUMask(FsPermission.getUMask(conf));
- }
-
- public static FsPermission getFileDefaultPermission(Configuration conf) {
- return getFileDefault().applyUMask(FsPermission.getUMask(conf));
- }
-
- public static FsPermission getDirDefault() {
- return new FsPermission((short)511);
- }
-
- public static FsPermission getFileDefault() {
- return new FsPermission((short)438);
- }
-
- public static void mkdirsWithDefaultPerms(FileSystem fs, Path path) throws IOException {
- mkdirs(fs, path, getDirDefaultPermission(fs.getConf()));
- }
-
- public static void mkdirs(FileSystem fs, Path path,
- FsPermission permission) throws IOException {
- if (!FileSystem.mkdirs(fs, path, permission)) {
- throw new IOException("mkdir failed for " + path);
- }
- }
-
- private void validateNameNode(URI uri, Configuration conf) throws FalconException {
- String nameNode = uri.getAuthority();
- if (nameNode == null) {
- nameNode = getNameNode(conf);
- if (nameNode != null) {
- try {
- new URI(nameNode).getAuthority();
- } catch (URISyntaxException ex) {
- throw new FalconException("Exception while getting FileSystem", ex);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java b/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java
deleted file mode 100644
index 5bcc2f8..0000000
--- a/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.lifecycle;
-
-import org.apache.falcon.FalconException;
-
-/**
- * Abstract factory class for feed lifecycle policy builders.
- */
-public abstract class AbstractPolicyBuilderFactory {
-
- public abstract PolicyBuilder getPolicyBuilder(String policyName) throws FalconException;
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java b/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
deleted file mode 100644
index 833ad04..0000000
--- a/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.lifecycle;
-
-/**
- * Enum for valid lifecycle stages for the feed.
- */
-public enum FeedLifecycleStage {
-
- RETENTION("AgeBasedDelete");
-
- private String defaultPolicyName;
-
- private FeedLifecycleStage(String defaultPolicyName) {
- this.defaultPolicyName = defaultPolicyName;
- }
-
- public String getDefaultPolicyName() {
- return defaultPolicyName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java b/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java
deleted file mode 100644
index be4e68c..0000000
--- a/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.lifecycle;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.hadoop.fs.Path;
-
-import java.util.Properties;
-
-/**
- * Interface for all policies in feed lifecycle.
- */
-public interface LifecyclePolicy {
-
- /**
- * Returns the name of the policy. Name of policy must be unique as it is used as an identifier.
- * @return name of the policy
- */
- String getName();
-
- /**
- * Returns the stage to which the policy belongs.
- * @return stage to which the policy belongs.
- */
- FeedLifecycleStage getStage();
-
- /**
- * Validates the configurations as per this policy.
- * @param feed Parent feed for which the policy is configured.
- * @param clusterName cluster to be used as context for validation.
- * @throws FalconException
- */
- void validate(Feed feed, String clusterName) throws FalconException;
-
- /**
- * Builds workflow engine artifacts.
- * @param cluster cluster to be used as context
- * @param buildPath base path to be used for storing the artifacts.
- * @param feed Parent feed.
- * @return Properties to be passed to the caller e.g. bundle in case of oozie workflow engine.
- * @throws FalconException
- */
- Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException;
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java b/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java
deleted file mode 100644
index 5e5055b..0000000
--- a/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.lifecycle;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.hadoop.fs.Path;
-
-import java.util.Properties;
-
-/**
- * Interface to be implemented by all policy builders for a lifecycle policy.
- * A Builder builds workflow engine specific artifacts for a policy.
- */
-public interface PolicyBuilder {
-
- Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException;
-
- String getPolicyName();
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
deleted file mode 100644
index 8d735f9..0000000
--- a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.lifecycle.retention;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.parser.ValidationException;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Property;
-import org.apache.falcon.entity.v0.feed.RetentionStage;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.util.StartupProperties;
-
-import java.util.Date;
-
-/**
- * Retention policy which deletes all instances of instance time older than a given time.
- * It will create the workflow and coordinators for this policy.
- */
-public class AgeBasedDelete extends RetentionPolicy {
-
- public static final String LIMIT_PROPERTY_NAME = "retention.policy.agebaseddelete.limit";
-
- @Override
- public void validate(Feed feed, String clusterName) throws FalconException {
- // validate that it is a valid cluster
- Cluster cluster = FeedHelper.getCluster(feed, clusterName);
- Frequency retentionLimit = getRetentionLimit(feed, clusterName);
- if (cluster != null) {
- validateLimitWithSla(feed, cluster, retentionLimit.toString());
- validateLimitWithLateData(feed, cluster, retentionLimit.toString());
- String lifecycleEngine = StartupProperties.get().getProperty("lifecycle.engine.impl",
- "org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory");
- if ("org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory".equals(lifecycleEngine)) {
- validateRetentionFrequencyForOozie(feed, clusterName);
- }
- }
- }
-
-
- private void validateRetentionFrequencyForOozie(Feed feed, String clusterName) throws FalconException {
- // retention shouldn't be more frequent than hours(1) for Oozie Builders.
- Frequency retentionFrequency = FeedHelper.getLifecycleRetentionFrequency(feed, clusterName);
- if (retentionFrequency.getTimeUnit() == Frequency.TimeUnit.minutes
- && retentionFrequency.getFrequencyAsInt() < 60) {
- throw new ValidationException("Feed Retention can not be more frequent than hours(1)");
- }
- }
-
- private void validateLimitWithLateData(Feed feed, Cluster cluster, String retention) throws FalconException {
- ExpressionHelper evaluator = ExpressionHelper.get();
- long retentionPeriod = evaluator.evaluate(retention, Long.class);
-
- if (feed.getLateArrival() != null) {
- String feedCutoff = feed.getLateArrival().getCutOff().toString();
- long feedCutOffPeriod = evaluator.evaluate(feedCutoff, Long.class);
- if (retentionPeriod < feedCutOffPeriod) {
- throw new ValidationException(
- "Feed's retention limit: " + retention + " of referenced cluster " + cluster.getName()
- + " should be more than feed's late arrival cut-off period: " + feedCutoff
- + " for feed: " + feed.getName());
- }
- }
- }
-
- private void validateLimitWithSla(Feed feed, Cluster cluster, String retentionExpression) throws FalconException {
- // test that slaHigh is less than retention
- Sla clusterSla = FeedHelper.getSLA(cluster, feed);
- if (clusterSla != null) {
- ExpressionHelper evaluator = ExpressionHelper.get();
- ExpressionHelper.setReferenceDate(new Date());
-
- Frequency slaHighExpression = clusterSla.getSlaHigh();
- Date slaHigh = new Date(evaluator.evaluate(slaHighExpression.toString(), Long.class));
-
- Date retention = new Date(evaluator.evaluate(retentionExpression, Long.class));
- if (slaHigh.after(retention)) {
- throw new ValidationException("slaHigh of Feed: " + slaHighExpression
- + " is greater than retention of the feed: " + retentionExpression
- + " for cluster: " + cluster.getName()
- );
- }
- }
- }
-
- public Frequency getRetentionLimit(Feed feed, String clusterName) throws FalconException {
- RetentionStage retention = FeedHelper.getRetentionStage(feed, clusterName);
- if (retention != null) {
- String limit = null;
- for (Property property : retention.getProperties().getProperties()) {
- if (StringUtils.equals(property.getName(), LIMIT_PROPERTY_NAME)) {
- limit = property.getValue();
- }
- }
- if (limit == null) {
- throw new FalconException("Property: " + LIMIT_PROPERTY_NAME + " is required for "
- + getName() + " policy.");
- }
- try {
- return new Frequency(limit);
- } catch (IllegalArgumentException e) {
- throw new FalconException("Invalid value for property: " + LIMIT_PROPERTY_NAME + ", should be a valid "
- + "frequency e.g. hours(2)", e);
- }
- } else {
- throw new FalconException("Cluster " + clusterName + " doesn't contain retention stage");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java
deleted file mode 100644
index 7fd6175..0000000
--- a/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.lifecycle.retention;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory;
-import org.apache.falcon.lifecycle.FeedLifecycleStage;
-import org.apache.falcon.lifecycle.LifecyclePolicy;
-import org.apache.falcon.lifecycle.PolicyBuilder;
-import org.apache.falcon.workflow.WorkflowEngineFactory;
-import org.apache.hadoop.fs.Path;
-
-import java.util.Properties;
-
-/**
- * All retention policies must implement this interface.
- */
-public abstract class RetentionPolicy implements LifecyclePolicy {
-
- @Override
- public String getName() {
- return this.getClass().getSimpleName();
- }
-
- @Override
- public FeedLifecycleStage getStage() {
- return FeedLifecycleStage.RETENTION;
- }
-
- @Override
- public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException {
- AbstractPolicyBuilderFactory factory = WorkflowEngineFactory.getLifecycleEngine();
- PolicyBuilder builder = factory.getPolicyBuilder(getName());
- return builder.build(cluster, buildPath, feed);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
deleted file mode 100644
index 25bbf0c..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ /dev/null
@@ -1,514 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import com.tinkerpop.blueprints.Graph;
-import com.tinkerpop.blueprints.Vertex;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.datasource.Datasource;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Workflow;
-import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Entity Metadata relationship mapping helper.
- */
-public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
-
- private static final Logger LOG = LoggerFactory.getLogger(EntityRelationshipGraphBuilder.class);
-
-
- public EntityRelationshipGraphBuilder(Graph graph, boolean preserveHistory) {
- super(graph, preserveHistory);
- }
-
- public void addEntity(Entity entity) {
- EntityType entityType = entity.getEntityType();
- switch (entityType) {
- case CLUSTER:
- addClusterEntity((Cluster) entity);
- break;
- case PROCESS:
- addProcessEntity((Process) entity);
- break;
- case FEED:
- addFeedEntity((Feed) entity);
- break;
- case DATASOURCE:
- addDatasourceEntity((Datasource) entity);
- break;
-
- default:
- throw new IllegalArgumentException("Invalid EntityType " + entityType);
- }
- }
-
- public void addClusterEntity(Cluster clusterEntity) {
- LOG.info("Adding cluster entity: {}", clusterEntity.getName());
- Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY);
-
- addUserRelation(clusterVertex);
- addColoRelation(clusterEntity.getColo(), clusterVertex);
- addDataClassification(clusterEntity.getTags(), clusterVertex);
- }
-
- public void addFeedEntity(Feed feed) {
- LOG.info("Adding feed entity: {}", feed.getName());
- Vertex feedVertex = addVertex(feed.getName(), RelationshipType.FEED_ENTITY);
-
- addUserRelation(feedVertex);
- addDataClassification(feed.getTags(), feedVertex);
- addGroups(feed.getGroups(), feedVertex);
-
- for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
- if (ClusterType.TARGET != feedCluster.getType()) {
- addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE);
- }
- }
-
- for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
- if (FeedHelper.isImportEnabled(feedCluster)) {
- addRelationToDatasource(feedVertex, FeedHelper.getImportDatasourceName(feedCluster),
- RelationshipLabel.DATASOURCE_IMPORT_EDGE);
- }
- }
- }
-
- public void addDatasourceEntity(Datasource dsEntity) {
- LOG.info("Adding datasource entity: {}", dsEntity.getName());
- Vertex dsVertex = addVertex(dsEntity.getName(), RelationshipType.DATASOURCE_ENTITY);
-
- addUserRelation(dsVertex);
- addColoRelation(dsEntity.getColo(), dsVertex);
- addDataClassification(dsEntity.getTags(), dsVertex);
- }
-
-
- public void updateEntity(Entity oldEntity, Entity newEntity) {
- EntityType entityType = oldEntity.getEntityType();
- switch (entityType) {
- case CLUSTER:
- // a cluster cannot be updated
- break;
- case PROCESS:
- updateProcessEntity((Process) oldEntity, (Process) newEntity);
- break;
- case FEED:
- updateFeedEntity((Feed) oldEntity, (Feed) newEntity);
- break;
- default:
- throw new IllegalArgumentException("Invalid EntityType " + entityType);
- }
- }
-
-
-
- public void updateFeedEntity(Feed oldFeed, Feed newFeed) {
- LOG.info("Updating feed entity: {}", newFeed.getName());
- Vertex feedEntityVertex = findVertex(oldFeed.getName(), RelationshipType.FEED_ENTITY);
- if (feedEntityVertex == null) {
- LOG.error("Illegal State: Feed entity vertex must exist for {}", oldFeed.getName());
- throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist.");
- }
-
- updateDataClassification(oldFeed.getTags(), newFeed.getTags(), feedEntityVertex);
- updateGroups(oldFeed.getGroups(), newFeed.getGroups(), feedEntityVertex);
- updateFeedClusters(oldFeed.getClusters().getClusters(),
- newFeed.getClusters().getClusters(), feedEntityVertex);
- }
-
- public void addProcessEntity(Process process) {
- String processName = process.getName();
- LOG.info("Adding process entity: {}", processName);
- Vertex processVertex = addVertex(processName, RelationshipType.PROCESS_ENTITY);
- addWorkflowProperties(process.getWorkflow(), processVertex, processName);
-
- addUserRelation(processVertex);
- addDataClassification(process.getTags(), processVertex);
- addPipelines(process.getPipelines(), processVertex);
-
- for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
- addRelationToCluster(processVertex, cluster.getName(), RelationshipLabel.PROCESS_CLUSTER_EDGE);
- }
-
- addInputFeeds(process.getInputs(), processVertex);
- addOutputFeeds(process.getOutputs(), processVertex);
- }
-
- public void updateProcessEntity(Process oldProcess, Process newProcess) {
- LOG.info("Updating process entity: {}", newProcess.getName());
- Vertex processEntityVertex = findVertex(oldProcess.getName(), RelationshipType.PROCESS_ENTITY);
- if (processEntityVertex == null) {
- LOG.error("Illegal State: Process entity vertex must exist for {}", oldProcess.getName());
- throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist");
- }
-
- updateWorkflowProperties(oldProcess.getWorkflow(), newProcess.getWorkflow(),
- processEntityVertex, newProcess.getName());
- updateDataClassification(oldProcess.getTags(), newProcess.getTags(), processEntityVertex);
- updatePipelines(oldProcess.getPipelines(), newProcess.getPipelines(), processEntityVertex);
- updateProcessClusters(oldProcess.getClusters().getClusters(),
- newProcess.getClusters().getClusters(), processEntityVertex);
- updateProcessInputs(oldProcess.getInputs(), newProcess.getInputs(), processEntityVertex);
- updateProcessOutputs(oldProcess.getOutputs(), newProcess.getOutputs(), processEntityVertex);
- }
-
- public void addColoRelation(String colo, Vertex fromVertex) {
- Vertex coloVertex = addVertex(colo, RelationshipType.COLO);
- addEdge(fromVertex, coloVertex, RelationshipLabel.CLUSTER_COLO.getName());
- }
-
- public void addRelationToCluster(Vertex fromVertex, String clusterName, RelationshipLabel edgeLabel) {
- Vertex clusterVertex = findVertex(clusterName, RelationshipType.CLUSTER_ENTITY);
- if (clusterVertex == null) { // cluster must exist before adding other entities
- LOG.error("Illegal State: Cluster entity vertex must exist for {}", clusterName);
- throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName);
- }
-
- addEdge(fromVertex, clusterVertex, edgeLabel.getName());
- }
-
- public void addRelationToDatasource(Vertex fromVertex, String datasourceName, RelationshipLabel edgeLabel) {
- Vertex clusterVertex = findVertex(datasourceName, RelationshipType.DATASOURCE_ENTITY);
- if (clusterVertex == null) { // cluster must exist before adding other entities
- LOG.error("Illegal State: Datasource entity vertex must exist for {}", datasourceName);
- throw new IllegalStateException("Datasource entity vertex must exist: " + datasourceName);
- }
-
- addEdge(fromVertex, clusterVertex, edgeLabel.getName());
- }
-
- public void addInputFeeds(Inputs inputs, Vertex processVertex) {
- if (inputs == null) {
- return;
- }
-
- for (Input input : inputs.getInputs()) {
- addProcessFeedEdge(processVertex, input.getFeed(), RelationshipLabel.FEED_PROCESS_EDGE);
- }
- }
-
- public void addOutputFeeds(Outputs outputs, Vertex processVertex) {
- if (outputs == null) {
- return;
- }
-
- for (Output output : outputs.getOutputs()) {
- addProcessFeedEdge(processVertex, output.getFeed(), RelationshipLabel.PROCESS_FEED_EDGE);
- }
- }
-
- public void addProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) {
- Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY);
- if (feedVertex == null) {
- LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName);
- throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
- }
-
- addProcessFeedEdge(processVertex, feedVertex, edgeLabel);
- }
-
- public void addWorkflowProperties(Workflow workflow, Vertex processVertex, String processName) {
- processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
- ProcessHelper.getProcessWorkflowName(workflow.getName(), processName));
- processVertex.setProperty(RelationshipProperty.VERSION.getName(), workflow.getVersion());
- processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
- workflow.getEngine().value());
- }
-
- public void updateWorkflowProperties(Workflow oldWorkflow, Workflow newWorkflow,
- Vertex processEntityVertex, String processName) {
- if (areSame(oldWorkflow, newWorkflow)) {
- return;
- }
-
- LOG.info("Updating workflow properties for: {}", processEntityVertex);
- addWorkflowProperties(newWorkflow, processEntityVertex, processName);
- }
-
- public void updateDataClassification(String oldClassification, String newClassification,
- Vertex entityVertex) {
- if (areSame(oldClassification, newClassification)) {
- return;
- }
-
- removeDataClassification(oldClassification, entityVertex);
- addDataClassification(newClassification, entityVertex);
- }
-
- private void removeDataClassification(String classification, Vertex entityVertex) {
- if (classification == null || classification.length() == 0) {
- return;
- }
-
- String[] oldTags = classification.split(",");
- for (String oldTag : oldTags) {
- int index = oldTag.indexOf("=");
- String tagKey = oldTag.substring(0, index);
- String tagValue = oldTag.substring(index + 1, oldTag.length());
-
- removeEdge(entityVertex, tagValue, tagKey);
- }
- }
-
- public void updateGroups(String oldGroups, String newGroups, Vertex entityVertex) {
- if (areSame(oldGroups, newGroups)) {
- return;
- }
-
- removeGroups(oldGroups, entityVertex);
- addGroups(newGroups, entityVertex);
- }
-
- public void updatePipelines(String oldPipelines, String newPipelines, Vertex entityVertex) {
- if (areSame(oldPipelines, newPipelines)) {
- return;
- }
-
- removePipelines(oldPipelines, entityVertex);
- addPipelines(newPipelines, entityVertex);
- }
-
- private void removeGroups(String groups, Vertex entityVertex) {
- removeGroupsOrPipelines(groups, entityVertex, RelationshipLabel.GROUPS);
- }
-
- private void removePipelines(String pipelines, Vertex entityVertex) {
- removeGroupsOrPipelines(pipelines, entityVertex, RelationshipLabel.PIPELINES);
- }
-
- private void removeGroupsOrPipelines(String groupsOrPipelines, Vertex entityVertex,
- RelationshipLabel edgeLabel) {
- if (StringUtils.isEmpty(groupsOrPipelines)) {
- return;
- }
-
- String[] oldGroupOrPipelinesTags = groupsOrPipelines.split(",");
- for (String groupOrPipelineTag : oldGroupOrPipelinesTags) {
- removeEdge(entityVertex, groupOrPipelineTag, edgeLabel.getName());
- }
- }
-
- public static boolean areSame(String oldValue, String newValue) {
- return oldValue == null && newValue == null
- || oldValue != null && newValue != null && oldValue.equals(newValue);
- }
-
- public void updateFeedClusters(List<org.apache.falcon.entity.v0.feed.Cluster> oldClusters,
- List<org.apache.falcon.entity.v0.feed.Cluster> newClusters,
- Vertex feedEntityVertex) {
- if (areFeedClustersSame(oldClusters, newClusters)) {
- return;
- }
-
- // remove edges to old clusters
- for (org.apache.falcon.entity.v0.feed.Cluster oldCuster : oldClusters) {
- if (ClusterType.TARGET != oldCuster.getType()) {
- removeEdge(feedEntityVertex, oldCuster.getName(),
- RelationshipLabel.FEED_CLUSTER_EDGE.getName());
- }
- }
-
- // add edges to new clusters
- for (org.apache.falcon.entity.v0.feed.Cluster newCluster : newClusters) {
- if (ClusterType.TARGET != newCluster.getType()) {
- addRelationToCluster(feedEntityVertex, newCluster.getName(),
- RelationshipLabel.FEED_CLUSTER_EDGE);
- }
- }
- }
-
- public boolean areFeedClustersSame(List<org.apache.falcon.entity.v0.feed.Cluster> oldClusters,
- List<org.apache.falcon.entity.v0.feed.Cluster> newClusters) {
- if (oldClusters.size() != newClusters.size()) {
- return false;
- }
-
- List<String> oldClusterNames = getFeedClusterNames(oldClusters);
- List<String> newClusterNames = getFeedClusterNames(newClusters);
-
- return oldClusterNames.size() == newClusterNames.size()
- && oldClusterNames.containsAll(newClusterNames)
- && newClusterNames.containsAll(oldClusterNames);
- }
-
- public List<String> getFeedClusterNames(List<org.apache.falcon.entity.v0.feed.Cluster> clusters) {
- List<String> clusterNames = new ArrayList<String>(clusters.size());
- for (org.apache.falcon.entity.v0.feed.Cluster cluster : clusters) {
- clusterNames.add(cluster.getName());
- }
-
- return clusterNames;
- }
-
- public void updateProcessClusters(List<org.apache.falcon.entity.v0.process.Cluster> oldClusters,
- List<org.apache.falcon.entity.v0.process.Cluster> newClusters,
- Vertex processEntityVertex) {
- if (areProcessClustersSame(oldClusters, newClusters)) {
- return;
- }
-
- // remove old clusters
- for (org.apache.falcon.entity.v0.process.Cluster oldCuster : oldClusters) {
- removeEdge(processEntityVertex, oldCuster.getName(),
- RelationshipLabel.PROCESS_CLUSTER_EDGE.getName());
- }
-
- // add new clusters
- for (org.apache.falcon.entity.v0.process.Cluster newCluster : newClusters) {
- addRelationToCluster(processEntityVertex, newCluster.getName(),
- RelationshipLabel.PROCESS_CLUSTER_EDGE);
- }
- }
-
- public boolean areProcessClustersSame(List<org.apache.falcon.entity.v0.process.Cluster> oldClusters,
- List<org.apache.falcon.entity.v0.process.Cluster> newClusters) {
- if (oldClusters.size() != newClusters.size()) {
- return false;
- }
-
- List<String> oldClusterNames = getProcessClusterNames(oldClusters);
- List<String> newClusterNames = getProcessClusterNames(newClusters);
-
- return oldClusterNames.size() == newClusterNames.size()
- && oldClusterNames.containsAll(newClusterNames)
- && newClusterNames.containsAll(oldClusterNames);
- }
-
- public List<String> getProcessClusterNames(List<org.apache.falcon.entity.v0.process.Cluster> clusters) {
- List<String> clusterNames = new ArrayList<String>(clusters.size());
- for (org.apache.falcon.entity.v0.process.Cluster cluster : clusters) {
- clusterNames.add(cluster.getName());
- }
-
- return clusterNames;
- }
-
- public static boolean areSame(Workflow oldWorkflow, Workflow newWorkflow) {
- return areSame(oldWorkflow.getName(), newWorkflow.getName())
- && areSame(oldWorkflow.getVersion(), newWorkflow.getVersion())
- && areSame(oldWorkflow.getEngine().value(), newWorkflow.getEngine().value());
- }
-
- private void updateProcessInputs(Inputs oldProcessInputs, Inputs newProcessInputs,
- Vertex processEntityVertex) {
- if (areSame(oldProcessInputs, newProcessInputs)) {
- return;
- }
-
- removeInputFeeds(oldProcessInputs, processEntityVertex);
- addInputFeeds(newProcessInputs, processEntityVertex);
- }
-
- public static boolean areSame(Inputs oldProcessInputs, Inputs newProcessInputs) {
- if (oldProcessInputs == null && newProcessInputs == null) {
- return true;
- }
-
- if (oldProcessInputs == null || newProcessInputs == null
- || oldProcessInputs.getInputs().size() != newProcessInputs.getInputs().size()) {
- return false;
- }
-
- List<Input> oldInputs = oldProcessInputs.getInputs();
- List<Input> newInputs = newProcessInputs.getInputs();
-
- return oldInputs.size() == newInputs.size()
- && oldInputs.containsAll(newInputs)
- && newInputs.containsAll(oldInputs);
- }
-
- public void removeInputFeeds(Inputs inputs, Vertex processVertex) {
- if (inputs == null) {
- return;
- }
-
- for (Input input : inputs.getInputs()) {
- removeProcessFeedEdge(processVertex, input.getFeed(), RelationshipLabel.FEED_PROCESS_EDGE);
- }
- }
-
- public void removeOutputFeeds(Outputs outputs, Vertex processVertex) {
- if (outputs == null) {
- return;
- }
-
- for (Output output : outputs.getOutputs()) {
- removeProcessFeedEdge(processVertex, output.getFeed(), RelationshipLabel.PROCESS_FEED_EDGE);
- }
- }
-
- public void removeProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) {
- Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY);
- if (feedVertex == null) {
- LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName);
- throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
- }
-
- if (edgeLabel == RelationshipLabel.FEED_PROCESS_EDGE) {
- removeEdge(feedVertex, processVertex, edgeLabel.getName());
- } else {
- removeEdge(processVertex, feedVertex, edgeLabel.getName());
- }
- }
-
- private void updateProcessOutputs(Outputs oldProcessOutputs, Outputs newProcessOutputs,
- Vertex processEntityVertex) {
- if (areSame(oldProcessOutputs, newProcessOutputs)) {
- return;
- }
-
- removeOutputFeeds(oldProcessOutputs, processEntityVertex);
- addOutputFeeds(newProcessOutputs, processEntityVertex);
- }
-
- public static boolean areSame(Outputs oldProcessOutputs, Outputs newProcessOutputs) {
- if (oldProcessOutputs == null && newProcessOutputs == null) {
- return true;
- }
-
- if (oldProcessOutputs == null || newProcessOutputs == null
- || oldProcessOutputs.getOutputs().size() != newProcessOutputs.getOutputs().size()) {
- return false;
- }
-
- List<Output> oldOutputs = oldProcessOutputs.getOutputs();
- List<Output> newOutputs = newProcessOutputs.getOutputs();
-
- return oldOutputs.size() == newOutputs.size()
- && oldOutputs.containsAll(newOutputs)
- && newOutputs.containsAll(oldOutputs);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java b/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java
deleted file mode 100644
index 8bec02f..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import com.tinkerpop.blueprints.Direction;
-import com.tinkerpop.blueprints.Edge;
-import com.tinkerpop.blueprints.Graph;
-import com.tinkerpop.blueprints.Vertex;
-import com.tinkerpop.blueprints.util.io.graphson.GraphSONWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Utility class for graph operations.
- */
-public final class GraphUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(GraphUtils.class);
-
- private GraphUtils() {
- }
-
- public static void dumpToLog(final Graph graph) {
- LOG.debug("Vertices of {}", graph);
- for (Vertex vertex : graph.getVertices()) {
- LOG.debug(vertexString(vertex));
- }
-
- LOG.debug("Edges of {}", graph);
- for (Edge edge : graph.getEdges()) {
- LOG.debug(edgeString(edge));
- }
- }
-
- public static void dump(final Graph graph) throws IOException {
- dump(graph, System.out);
- }
-
- public static void dump(final Graph graph, OutputStream outputStream) throws IOException {
- GraphSONWriter.outputGraph(graph, outputStream);
- }
-
- public static void dump(final Graph graph, String fileName) throws IOException {
- GraphSONWriter.outputGraph(graph, fileName);
- }
-
- public static String vertexString(final Vertex vertex) {
- StringBuilder properties = new StringBuilder();
- for (String propertyKey : vertex.getPropertyKeys()) {
- properties.append(propertyKey)
- .append("=").append(vertex.getProperty(propertyKey))
- .append(", ");
- }
-
- return "v[" + vertex.getId() + "], Properties[" + properties + "]";
- }
-
- public static String edgeString(final Edge edge) {
- return "e[" + edge.getLabel() + "], ["
- + edge.getVertex(Direction.OUT).getProperty("name")
- + " -> " + edge.getLabel() + " -> "
- + edge.getVertex(Direction.IN).getProperty("name")
- + "]";
- }
-}