You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/04/23 15:11:57 UTC
[2/3] git commit: CAMEL-7386: camel-openshift component.
CAMEL-7386: camel-openshift component.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ecdd4707
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ecdd4707
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ecdd4707
Branch: refs/heads/master
Commit: ecdd4707b3a3957b4002344f18a8b84e81912700
Parents: 9d53700
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Apr 23 12:41:54 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Apr 23 12:41:54 2014 +0200
----------------------------------------------------------------------
.../component/openshift/OpenShiftConstants.java | 3 +
.../component/openshift/OpenShiftConsumer.java | 185 ++++++++++++++++++-
.../component/openshift/OpenShiftEndpoint.java | 19 ++
.../component/openshift/OpenShiftHelper.java | 14 ++
.../component/openshift/OpenShiftPollMode.java | 22 +++
.../component/openshift/OpenShiftProducer.java | 11 +-
.../OpenShiftConsumerOnChangeTest.java | 58 ++++++
...penShiftConsumerOnChangeWithInitialTest.java | 58 ++++++
8 files changed, 358 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConstants.java b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConstants.java
index dcbabbf..1dd0495 100644
--- a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConstants.java
+++ b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConstants.java
@@ -20,6 +20,9 @@ public final class OpenShiftConstants {
public static final String OPERATION = "CamelOpenShiftOperation";
public static final String APPLICATION = "CamelOpenShiftApplication";
+ public static final String EVENT_TYPE = "CamelOpenShiftEventType";
+ public static final String EVENT_OLD_STATE = "CamelOpenShiftEventOldState";
+ public static final String EVENT_NEW_STATE = "CamelOpenShiftEventNewState";
private OpenShiftConstants() {
}
http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConsumer.java b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConsumer.java
index 9cbefd8..0af8ba1 100644
--- a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConsumer.java
+++ b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConsumer.java
@@ -16,7 +16,9 @@
*/
package org.apache.camel.component.openshift;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import com.openshift.client.IApplication;
import com.openshift.client.IDomain;
@@ -27,6 +29,9 @@ import org.apache.camel.impl.ScheduledPollConsumer;
public class OpenShiftConsumer extends ScheduledPollConsumer {
+ private final Map<ApplicationState, ApplicationState> oldState = new HashMap<ApplicationState, ApplicationState>();
+ private volatile boolean initialPoll;
+
public OpenShiftConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
}
@@ -37,6 +42,12 @@ public class OpenShiftConsumer extends ScheduledPollConsumer {
}
@Override
+ protected void doStart() throws Exception {
+ initialPoll = true;
+ super.doStart();
+ }
+
+ @Override
protected int poll() throws Exception {
String openshiftServer = OpenShiftHelper.getOpenShiftServer(getEndpoint());
IDomain domain = OpenShiftHelper.loginAndGetDomain(getEndpoint(), openshiftServer);
@@ -44,11 +55,15 @@ public class OpenShiftConsumer extends ScheduledPollConsumer {
return 0;
}
- List<IApplication> apps = domain.getApplications();
-
- // TODO grab state
- // TODO: option to only emit if state changes
+ if (getEndpoint().getPollMode().equals(OpenShiftPollMode.all.name())) {
+ return doPollAll(domain);
+ } else {
+ return doPollOnChange(domain);
+ }
+ }
+ protected int doPollAll(IDomain domain) {
+ List<IApplication> apps = domain.getApplications();
for (IApplication app : apps) {
Exchange exchange = getEndpoint().createExchange(app);
try {
@@ -60,8 +75,168 @@ public class OpenShiftConsumer extends ScheduledPollConsumer {
getExceptionHandler().handleException("Error during processing exchange.", exchange, exchange.getException());
}
}
-
return apps.size();
}
+ protected int doPollOnChange(IDomain domain) {
+
+ // an app can either be
+ // - added
+ // - removed
+ // - state changed
+
+ Map<ApplicationState, ApplicationState> newState = new HashMap<ApplicationState, ApplicationState>();
+
+ List<IApplication> apps = domain.getApplications();
+ for (IApplication app : apps) {
+ ApplicationState state = new ApplicationState(app.getUUID(), app, OpenShiftHelper.getStateForApplication(app));
+ newState.put(state, state);
+ }
+
+ // compute what is the delta from last time
+ // so we split up into 3 groups, of added/removed/changed
+ Map<ApplicationState, ApplicationState> added = new HashMap<ApplicationState, ApplicationState>();
+ Map<ApplicationState, ApplicationState> removed = new HashMap<ApplicationState, ApplicationState>();
+ Map<ApplicationState, ApplicationState> changed = new HashMap<ApplicationState, ApplicationState>();
+
+ for (ApplicationState state : newState.keySet()) {
+ if (!oldState.containsKey(state)) {
+ // its a new app added
+ added.put(state, state);
+ } else {
+ ApplicationState old = oldState.get(state);
+ if (old != null && !old.getState().equals(state.getState())) {
+ // the state changed
+ state.setOldState(old.getState());
+ changed.put(state, state);
+ }
+ }
+ }
+ for (ApplicationState state : oldState.keySet()) {
+ if (!newState.containsKey(state)) {
+ // its a app removed
+ removed.put(state, state);
+ }
+ }
+
+ // only emit if needed
+ int processed = 0;
+ if (!initialPoll || initialPoll && getEndpoint().getPollMode().equals(OpenShiftPollMode.onChangeWithInitial.name())) {
+
+ for (ApplicationState add : added.keySet()) {
+ Exchange exchange = getEndpoint().createExchange(add.getApplication());
+ exchange.getIn().setHeader(OpenShiftConstants.EVENT_TYPE, "added");
+ try {
+ processed++;
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error during processing exchange.", exchange, exchange.getException());
+ }
+ }
+ for (ApplicationState remove : removed.keySet()) {
+ Exchange exchange = getEndpoint().createExchange(remove.getApplication());
+ exchange.getIn().setHeader(OpenShiftConstants.EVENT_TYPE, "removed");
+ try {
+ processed++;
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error during processing exchange.", exchange, exchange.getException());
+ }
+ }
+
+ for (ApplicationState change : changed.keySet()) {
+ Exchange exchange = getEndpoint().createExchange(change.getApplication());
+ exchange.getIn().setHeader(OpenShiftConstants.EVENT_TYPE, "changed");
+ exchange.getIn().setHeader(OpenShiftConstants.EVENT_OLD_STATE, change.getOldState());
+ exchange.getIn().setHeader(OpenShiftConstants.EVENT_NEW_STATE, change.getState());
+ try {
+ processed++;
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error during processing exchange.", exchange, exchange.getException());
+ }
+ }
+ }
+
+ // update old state with latest state for next poll
+ oldState.clear();
+ oldState.putAll(newState);
+
+ initialPoll = false;
+
+ return processed;
+ }
+
+ private static final class ApplicationState {
+ private final String uuid;
+ private final IApplication application;
+ private final String state;
+ private String oldState;
+
+ private ApplicationState(String uuid, IApplication application, String state) {
+ this.uuid = uuid;
+ this.application = application;
+ this.state = state;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public IApplication getApplication() {
+ return application;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public String getOldState() {
+ return oldState;
+ }
+
+ public void setOldState(String oldState) {
+ this.oldState = oldState;
+ }
+
+ // only use uuid and state for equals as that is what we want to use for state change detection
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ApplicationState that = (ApplicationState) o;
+
+ if (!state.equals(that.state)) {
+ return false;
+ }
+ if (!uuid.equals(that.uuid)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = uuid.hashCode();
+ result = 31 * result + state.hashCode();
+ return result;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftEndpoint.java b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftEndpoint.java
index 0735762..2fe6aef 100644
--- a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftEndpoint.java
+++ b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftEndpoint.java
@@ -45,6 +45,8 @@ public class OpenShiftEndpoint extends ScheduledPollEndpoint {
private String operation;
@UriParam
private String application;
+ @UriParam
+ private String pollMode = OpenShiftPollMode.all.name();
public OpenShiftEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
@@ -55,11 +57,16 @@ public class OpenShiftEndpoint extends ScheduledPollEndpoint {
ObjectHelper.notEmpty(clientId, "clientId", this);
ObjectHelper.notEmpty(username, "username", this);
ObjectHelper.notEmpty(password, "password", this);
+
return new OpenShiftProducer(this);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
+ ObjectHelper.notEmpty(clientId, "clientId", this);
+ ObjectHelper.notEmpty(username, "username", this);
+ ObjectHelper.notEmpty(password, "password", this);
+
Consumer consumer = new OpenShiftConsumer(this, processor);
configureConsumer(consumer);
return consumer;
@@ -135,4 +142,16 @@ public class OpenShiftEndpoint extends ScheduledPollEndpoint {
public void setApplication(String application) {
this.application = application;
}
+
+ public String getPollMode() {
+ return pollMode;
+ }
+
+ public void setPollMode(String pollMode) {
+ this.pollMode = pollMode;
+ }
+
+ public void setPollMode(OpenShiftPollMode pollMode) {
+ this.pollMode = pollMode.name();
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftHelper.java b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftHelper.java
index cb7db5e..757ce0f 100644
--- a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftHelper.java
+++ b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftHelper.java
@@ -17,8 +17,12 @@
package org.apache.camel.component.openshift;
import java.io.IOException;
+import java.util.Locale;
+import com.openshift.client.IApplication;
import com.openshift.client.IDomain;
+import com.openshift.client.IGear;
+import com.openshift.client.IGearGroup;
import com.openshift.client.IOpenShiftConnection;
import com.openshift.client.IUser;
import com.openshift.client.OpenShiftConnectionFactory;
@@ -58,4 +62,14 @@ public final class OpenShiftHelper {
return domain;
}
+
+ public static String getStateForApplication(IApplication application) {
+ for (IGearGroup group : application.getGearGroups()) {
+ for (IGear gear : group.getGears()) {
+ String state = gear.getState().name().toLowerCase(Locale.ENGLISH);
+ return state;
+ }
+ }
+ return "unknown";
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftPollMode.java
----------------------------------------------------------------------
diff --git a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftPollMode.java b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftPollMode.java
new file mode 100644
index 0000000..64f534a
--- /dev/null
+++ b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftPollMode.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.openshift;
+
+public enum OpenShiftPollMode {
+
+ all, onChange, onChangeWithInitial
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftProducer.java b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftProducer.java
index d453be4..47714e8 100644
--- a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftProducer.java
+++ b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftProducer.java
@@ -80,6 +80,8 @@ public class OpenShiftProducer extends DefaultProducer {
protected void doList(Exchange exchange, IDomain domain) {
StringBuilder sb = new StringBuilder("{\n \"applications\": [");
+ // TODO: option to output as pojo or json
+
boolean first = true;
for (IApplication application : domain.getApplications()) {
if (!first) {
@@ -203,13 +205,8 @@ public class OpenShiftProducer extends DefaultProducer {
if (app == null) {
throw new CamelExchangeException("Application with id " + name + " not found.", exchange);
} else {
- for (IGearGroup group : app.getGearGroups()) {
- for (IGear gear : group.getGears()) {
- String state = gear.getState().name().toLowerCase(Locale.ENGLISH);
- exchange.getIn().setBody(state);
- break;
- }
- }
+ String state = OpenShiftHelper.getStateForApplication(app);
+ exchange.getIn().setBody(state);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeTest.java b/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeTest.java
new file mode 100644
index 0000000..4b7199c
--- /dev/null
+++ b/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.openshift;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class OpenShiftConsumerOnChangeTest extends CamelTestSupport {
+
+ private String username;
+ private String password;
+
+ @Override
+ public void setUp() throws Exception {
+ // INSERT credentials here
+ username = null;
+ password = null;
+ super.setUp();
+ }
+
+ @Test
+ public void testConsumer() throws Exception {
+ if (username == null) {
+ return;
+ }
+
+ getMockEndpoint("mock:result").expectedMinimumMessageCount(1);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ fromF("openshift:myApp?username=%s&password=%s&delay=5s&pollMode=onChange", username, password)
+ .log("Event ${header.CamelOpenShiftEventType} for app ${body.name}")
+ .to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeWithInitialTest.java
----------------------------------------------------------------------
diff --git a/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeWithInitialTest.java b/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeWithInitialTest.java
new file mode 100644
index 0000000..7502ab9
--- /dev/null
+++ b/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeWithInitialTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.openshift;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class OpenShiftConsumerOnChangeWithInitialTest extends CamelTestSupport {
+
+ private String username;
+ private String password;
+
+ @Override
+ public void setUp() throws Exception {
+ // INSERT credentials here
+ username = null;
+ password = null;
+ super.setUp();
+ }
+
+ @Test
+ public void testConsumer() throws Exception {
+ if (username == null) {
+ return;
+ }
+
+ getMockEndpoint("mock:result").expectedMinimumMessageCount(1);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ fromF("openshift:myApp?username=%s&password=%s&delay=5s&pollMode=onChangeWithInitial", username, password)
+ .log("Event ${header.CamelOpenShiftEventType} for app ${body.name}")
+ .to("mock:result");
+ }
+ };
+ }
+}